| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.updater; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.collect.FluentIterable; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Ordering; |
| import com.google.inject.Inject; |
| |
| import org.apache.aurora.common.application.Lifecycle; |
| import org.apache.aurora.common.collections.Pair; |
| import org.apache.aurora.common.quantity.Amount; |
| import org.apache.aurora.common.quantity.Time; |
| import org.apache.aurora.common.stats.StatsProvider; |
| import org.apache.aurora.common.util.Clock; |
| import org.apache.aurora.gen.JobInstanceUpdateEvent; |
| import org.apache.aurora.gen.JobUpdateAction; |
| import org.apache.aurora.gen.JobUpdateEvent; |
| import org.apache.aurora.gen.JobUpdatePulseStatus; |
| import org.apache.aurora.gen.JobUpdateQuery; |
| import org.apache.aurora.gen.JobUpdateStatus; |
| import org.apache.aurora.scheduler.BatchWorker; |
| import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; |
| import org.apache.aurora.scheduler.base.InstanceKeys; |
| import org.apache.aurora.scheduler.base.JobKeys; |
| import org.apache.aurora.scheduler.base.Query; |
| import org.apache.aurora.scheduler.state.StateManager; |
| import org.apache.aurora.scheduler.storage.JobUpdateStore; |
| import org.apache.aurora.scheduler.storage.Storage; |
| import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; |
| import org.apache.aurora.scheduler.storage.TaskStore; |
| import org.apache.aurora.scheduler.storage.entities.IInstanceKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdate; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateStrategy; |
| import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.updater.StateEvaluator.Failure; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import static org.apache.aurora.gen.JobUpdateStatus.ABORTED; |
| import static org.apache.aurora.gen.JobUpdateStatus.ERROR; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; |
| import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; |
| import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError; |
| import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES; |
| import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; |
| import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName; |
| import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_ACTIVE_RESUME_STATE; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_BLOCKED_RESUME_STATE; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_PAUSE_STATE; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.GET_UNBLOCKED_STATE; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.ROLL_BACK; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.ROLL_FORWARD; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.MonitorAction.STOP_WATCHING; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.assertTransitionAllowed; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.getBlockedState; |
| import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.getPausedState; |
| import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.EvaluationResult; |
| import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus; |
| import static org.apache.aurora.scheduler.updater.OneWayJobUpdater.OneWayStatus.SUCCEEDED; |
| import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatus; |
| |
| /** |
| * Implementation of an updater that orchestrates the process of gradually updating the |
| * configuration of tasks in a job. |
| * <p> |
| * TODO(wfarner): Consider using AbstractIdleService here. |
| */ |
| class JobUpdateControllerImpl implements JobUpdateController { |
| private static final Logger LOG = LoggerFactory.getLogger(JobUpdateControllerImpl.class); |
| private static final String FATAL_ERROR_FORMAT = |
| "Unexpected problem running asynchronous updater for: %s. Triggering shutdown"; |
| private static final String UPDATE_AUTO_PAUSED = "Update auto paused"; |
| |
| private final UpdateFactory updateFactory; |
| private final Storage storage; |
| private final ScheduledExecutorService executor; |
| private final StateManager stateManager; |
| private final Clock clock; |
| private final PulseHandler pulseHandler; |
| private final Lifecycle lifecycle; |
| private final TaskEventBatchWorker taskEventBatchWorker; |
| private final UpdateAgentReserver updateAgentReserver; |
| private final SlaKillController slaKillController; |
| |
| // Currently-active updaters. An active updater is one that is rolling forward or back. Paused |
| // and completed updates are represented only in storage, not here. |
| private final Map<IJobKey, UpdateFactory.Update> updates = |
| Collections.synchronizedMap(Maps.newHashMap()); |
| |
| // Used only for updates that have auto pause enabled. Keeps track of what instances |
| // have already been seen by the updater in order to detect when a new batch is started. |
| private final Map<IJobUpdateKey, Set<Integer>> instancesSeen = new ConcurrentHashMap<>(); |
| |
| private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats; |
| private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats; |
| |
| @Inject |
| JobUpdateControllerImpl( |
| UpdateFactory updateFactory, |
| Storage storage, |
| ScheduledExecutorService executor, |
| StateManager stateManager, |
| UpdateAgentReserver updateAgentReserver, |
| Clock clock, |
| Lifecycle lifecycle, |
| TaskEventBatchWorker taskEventBatchWorker, |
| StatsProvider statsProvider, |
| SlaKillController slaKillController) { |
| |
| this.updateFactory = requireNonNull(updateFactory); |
| this.storage = requireNonNull(storage); |
| this.executor = requireNonNull(executor); |
| this.stateManager = requireNonNull(stateManager); |
| this.clock = requireNonNull(clock); |
| this.lifecycle = requireNonNull(lifecycle); |
| this.taskEventBatchWorker = requireNonNull(taskEventBatchWorker); |
| this.pulseHandler = new PulseHandler(clock); |
| this.updateAgentReserver = requireNonNull(updateAgentReserver); |
| this.slaKillController = requireNonNull(slaKillController); |
| |
| this.jobUpdateEventStats = CacheBuilder.newBuilder() |
| .build(new CacheLoader<JobUpdateStatus, AtomicLong>() { |
| @Override |
| public AtomicLong load(JobUpdateStatus status) { |
| return statsProvider.makeCounter(jobUpdateStatusStatName(status)); |
| } |
| }); |
| Arrays.stream(JobUpdateStatus.values()) |
| .forEach(status -> jobUpdateEventStats.getUnchecked(status).get()); |
| |
| this.jobUpdateActionStats = CacheBuilder.newBuilder() |
| .build(new CacheLoader<JobUpdateAction, AtomicLong>() { |
| @Override |
| public AtomicLong load(JobUpdateAction action) { |
| return statsProvider.makeCounter(jobUpdateActionStatName(action)); |
| } |
| }); |
| Arrays.stream(JobUpdateAction.values()) |
| .forEach(action -> jobUpdateActionStats.getUnchecked(action).get()); |
| } |
| |
| @Override |
| public void start(final IJobUpdate update, final AuditData auditData) |
| throws UpdateStateException { |
| |
| requireNonNull(update); |
| requireNonNull(auditData); |
| |
| storage.write((NoResult<UpdateStateException>) storeProvider -> { |
| IJobUpdateSummary summary = update.getSummary(); |
| IJobUpdateInstructions instructions = update.getInstructions(); |
| IJobKey job = summary.getKey().getJob(); |
| |
| // Validate the update configuration by making sure we can create an updater for it. |
| updateFactory.newUpdate(update.getInstructions(), true); |
| |
| if (instructions.getInitialState().isEmpty() && !instructions.isSetDesiredState()) { |
| throw new IllegalArgumentException("Update instruction is a no-op."); |
| } |
| |
| List<IJobUpdateDetails> activeJobUpdates = |
| storeProvider.getJobUpdateStore().fetchJobUpdates(queryActiveByJob(job)); |
| if (!activeJobUpdates.isEmpty()) { |
| if (activeJobUpdates.size() > 1) { |
| LOG.error("Multiple active updates exist for this job. {}", activeJobUpdates); |
| throw new UpdateStateException( |
| String.format("Multiple active updates exist for this job. %s", activeJobUpdates)); |
| } |
| |
| IJobUpdateDetails activeUpdate = activeJobUpdates.stream().findFirst().get(); |
| throw new UpdateInProgressException("An active update already exists for this job, " |
| + "please terminate it before starting another. " |
| + "Active updates are those in states " + Updates.ACTIVE_JOB_UPDATE_STATES, |
| activeUpdate.getUpdate().getSummary()); |
| } |
| |
| LOG.info("Starting update for job " + job); |
| |
| storeProvider.getJobUpdateStore().saveJobUpdate(update); |
| |
| JobUpdateStatus status = ROLLING_FORWARD; |
| if (isCoordinatedUpdate(instructions)) { |
| status = ROLL_FORWARD_AWAITING_PULSE; |
| pulseHandler.initializePulseState(update, status, 0L); |
| } |
| |
| recordAndChangeJobUpdateStatus( |
| storeProvider, |
| summary.getKey(), |
| addAuditData(newEvent(status), auditData)); |
| }); |
| } |
| |
| @Override |
| public void assertNotUpdating(IJobKey job) throws JobUpdatingException { |
| requireNonNull(job); |
| |
| if (storage.read(p -> !p.getJobUpdateStore() |
| .fetchJobUpdates(queryActiveByJob(job)).isEmpty())) { |
| |
| throw new JobUpdatingException("Job is currently updating"); |
| } |
| } |
| |
| @Override |
| public void pause(final IJobUpdateKey key, AuditData auditData) throws UpdateStateException { |
| requireNonNull(key); |
| LOG.info("Attempting to pause update " + key); |
| unscopedChangeUpdateStatus( |
| key, |
| Functions.compose(createAuditedEvent(auditData), GET_PAUSE_STATE)); |
| } |
| |
| @Override |
| public void resume(final IJobUpdateKey key, final AuditData auditData) |
| throws UpdateStateException { |
| |
| requireNonNull(key); |
| requireNonNull(auditData); |
| LOG.info("Attempting to resume update " + key); |
| storage.write((NoResult<UpdateStateException>) storeProvider -> { |
| Optional<IJobUpdateDetails> details = storeProvider.getJobUpdateStore().fetchJobUpdate(key); |
| |
| if (!details.isPresent()) { |
| throw new UpdateStateException("Update does not exist: " + key); |
| } |
| |
| IJobUpdate update = details.get().getUpdate(); |
| Function<JobUpdateStatus, JobUpdateStatus> stateChange = |
| isCoordinatedAndPulseExpired(key, update.getInstructions()) |
| ? GET_BLOCKED_RESUME_STATE |
| : GET_ACTIVE_RESUME_STATE; |
| |
| JobUpdateStatus newStatus = stateChange.apply(update.getSummary().getState().getStatus()); |
| changeUpdateStatus( |
| storeProvider, |
| update.getSummary(), |
| addAuditData(newEvent(newStatus), auditData)); |
| }); |
| } |
| |
| @Override |
| public void abort(IJobUpdateKey key, AuditData auditData) throws UpdateStateException { |
| unscopedChangeUpdateStatus( |
| key, |
| Functions.compose(createAuditedEvent(auditData), Functions.constant(ABORTED))); |
| } |
| |
| @Override |
| public void rollback(IJobUpdateKey key, AuditData auditData) throws UpdateStateException { |
| unscopedChangeUpdateStatus( |
| key, |
| Functions.compose(createAuditedEvent(auditData), Functions.constant(ROLLING_BACK))); |
| } |
| |
| private static Function<JobUpdateStatus, JobUpdateEvent> createAuditedEvent( |
| final AuditData auditData) { |
| |
| return status -> addAuditData(newEvent(status), auditData); |
| } |
| |
| private static final Ordering<IJobUpdateEvent> CHRON_ORDERING = |
| Ordering.from(Comparator.comparingLong(IJobUpdateEvent::getTimestampMs)); |
| |
| private long inferLastPulseTimestamp(IJobUpdateEvent mostRecent) { |
| // Pulse timestamps are not durably stored by design. However, on system recovery, |
| // setting the timestamp of the last pulse to 0L (aka no pulse) is not correct. |
| // By inspecting the job update events we can infer a reasonable time stamp to initialize to. |
| // In this case, if the upgrade was not waiting for a pulse previously, we can reuse the |
| // timestamp of the last event. This does reset the counter for pulses, but reflects the |
| // most likely behaviour of a healthy system. |
| |
| long ts = 0L; |
| |
| if (!AWAITING_PULSE_STATES.contains(mostRecent.getStatus())) { |
| ts = mostRecent.getTimestampMs(); |
| } |
| |
| return ts; |
| } |
| |
| public boolean isAutoPauseEnabled(IJobUpdateStrategy strategy) { |
| if (strategy.isSetBatchStrategy()) { |
| return strategy.getBatchStrategy().isAutopauseAfterBatch(); |
| } |
| |
| if (strategy.isSetVarBatchStrategy()) { |
| return strategy.getVarBatchStrategy().isAutopauseAfterBatch(); |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public void systemResume() { |
| storage.write((NoResult.Quiet) storeProvider -> { |
| for (IJobUpdateDetails details |
| : storeProvider.getJobUpdateStore().fetchJobUpdates(ACTIVE_QUERY)) { |
| |
| IJobUpdateSummary summary = details.getUpdate().getSummary(); |
| IJobUpdateInstructions instructions = details.getUpdate().getInstructions(); |
| IJobUpdateKey key = summary.getKey(); |
| JobUpdateStatus status = summary.getState().getStatus(); |
| // This is safe because we always write at least one job update event on job update creation |
| IJobUpdateEvent latestEvent = CHRON_ORDERING.max(details.getUpdateEvents()); |
| |
| if (isCoordinatedUpdate(instructions)) { |
| LOG.info("Automatically restoring pulse state for " + key); |
| |
| long pulseMs = inferLastPulseTimestamp(latestEvent); |
| pulseHandler.initializePulseState(details.getUpdate(), status, pulseMs); |
| } |
| // Since the restart causes the update to lose state, we backfill instances seen if: |
| // a) The update has auto pause after batch enabled |
| // b) The update is not currently paused. |
| // This restores necessary state for any update that was ROLLING_FORWARD when |
| // the scheduler was restarted to avoid crashing the scheduler. |
| // We do not backfill when the update is in the ROLL_FORWARD_PAUSED status as the subsequent |
| // resume will correctly re-initialize the seen instances state for the update. |
| if (isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy()) |
| && latestEvent.getStatus() == ROLLING_FORWARD) { |
| LOG.info("Re-populating previously seen instances for " + key); |
| |
| instancesSeen.put(key, |
| storeProvider.getJobUpdateStore() |
| .fetchJobUpdate(key) |
| .get() |
| .getInstanceEvents() |
| .stream() |
| .map(e -> e.getInstanceId()) |
| .collect(Collectors.toCollection(HashSet::new))); |
| } |
| |
| if (AUTO_RESUME_STATES.contains(status)) { |
| LOG.info("Automatically resuming update " + key); |
| |
| try { |
| changeJobUpdateStatus(storeProvider, key, newEvent(status), false); |
| } catch (UpdateStateException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public JobUpdatePulseStatus pulse(final IJobUpdateKey key) throws UpdateStateException { |
| final PulseState state = pulseHandler.pulseAndGet(key); |
| if (state == null) { |
| LOG.info("Not pulsing inactive job update: " + key); |
| return JobUpdatePulseStatus.FINISHED; |
| } |
| |
| LOG.debug( |
| "Job update {} has been pulsed. Timeout of {} msec is reset.", |
| key, |
| state.getPulseTimeoutMs()); |
| |
| if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) { |
| // Attempt to unblock a job update previously blocked on expired pulse. |
| executor.execute(shutdownOnError( |
| lifecycle, |
| LOG, |
| String.format(FATAL_ERROR_FORMAT, key), |
| () -> { |
| try { |
| unscopedChangeUpdateStatus( |
| key, |
| status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status))); |
| } catch (UpdateStateException e) { |
| LOG.error(String.format("Error processing job update pulse for %s: %s", key, e)); |
| } |
| })); |
| } |
| |
| return JobUpdatePulseStatus.OK; |
| } |
| |
| @Override |
| public void instanceChangedState(final IScheduledTask updatedTask) { |
| instanceChanged( |
| InstanceKeys.from( |
| updatedTask.getAssignedTask().getTask().getJob(), |
| updatedTask.getAssignedTask().getInstanceId()), |
| Optional.of(updatedTask)); |
| } |
| |
| @Override |
| public void instanceDeleted(IInstanceKey instance) { |
| // This is primarily used to detect when an instance was stuck in PENDING and killed, which |
| // results in deletion. |
| instanceChanged(instance, Optional.empty()); |
| } |
| |
| private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) { |
| taskEventBatchWorker.execute(storeProvider -> { |
| IJobKey job = instance.getJobKey(); |
| UpdateFactory.Update update = updates.get(job); |
| if (update != null) { |
| if (update.getUpdater().containsInstance(instance.getInstanceId())) { |
| // We check to see if the state change is specified, and if it is, ensure that the new |
| // state matches the current state. We do this because events are processed asynchronously |
| // and it is possible for an old event trigger an action that should not be triggered |
| // for the actual updated state. |
| if (!state.isPresent() || isLatestState(storeProvider, state.get())) { |
| LOG.info("Forwarding task change for " + InstanceKeys.toString(instance)); |
| try { |
| evaluateUpdater( |
| storeProvider, |
| update, |
| getOnlyMatch(storeProvider.getJobUpdateStore(), queryActiveByJob(job)), |
| ImmutableMap.of(instance.getInstanceId(), state)); |
| } catch (UpdateStateException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| LOG.info("Ignoring out of date task change for " + instance); |
| } |
| } else { |
| LOG.info("Instance " + instance + " is not part of active update for " |
| + JobKeys.canonicalString(job)); |
| } |
| } |
| return BatchWorker.NO_RESULT; |
| }); |
| } |
| |
| /** |
| * Check to see that a given {@link IScheduledTask} still exists in storage and has the same |
| * status. |
| */ |
| private boolean isLatestState(MutableStoreProvider storeProvider, IScheduledTask reportedState) { |
| Optional<IScheduledTask> currentState = storeProvider |
| .getTaskStore() |
| .fetchTask(reportedState.getAssignedTask().getTaskId()); |
| |
| return currentState |
| .map(iScheduledTask -> iScheduledTask.getStatus() == reportedState.getStatus()) |
| .orElse(false); |
| } |
| |
| private IJobUpdateSummary getOnlyMatch(JobUpdateStore store, IJobUpdateQuery query) { |
| return Iterables.getOnlyElement(store.fetchJobUpdates(query)).getUpdate().getSummary(); |
| } |
| |
| @VisibleForTesting |
| static IJobUpdateQuery queryActiveByJob(IJobKey job) { |
| return IJobUpdateQuery.build(new JobUpdateQuery() |
| .setJobKey(job.newBuilder()) |
| .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES)); |
| } |
| |
| /** |
| * Changes the state of an update, without the 'scope' of an update ID. This should only be used |
| * when responding to outside inputs that are inherently un-scoped, such as a user action or task |
| * state change. |
| * |
| * @param key Update identifier. |
| * @param stateChange State change computation, based on the current state of the update. |
| * @throws UpdateStateException If no active update exists for the provided {@code job}, or |
| * if the proposed state transition is not allowed. |
| */ |
| private void unscopedChangeUpdateStatus( |
| final IJobUpdateKey key, |
| final Function<? super JobUpdateStatus, JobUpdateEvent> stateChange) |
| throws UpdateStateException { |
| |
| storage.write((NoResult<UpdateStateException>) storeProvider -> { |
| |
| Optional<IJobUpdateDetails> update = storeProvider.getJobUpdateStore().fetchJobUpdate(key); |
| if (!update.isPresent()) { |
| throw new UpdateStateException("Update does not exist " + key); |
| } |
| |
| IJobUpdateSummary summary = update.get().getUpdate().getSummary(); |
| changeUpdateStatus(storeProvider, summary, stateChange.apply(summary.getState().getStatus())); |
| }); |
| } |
| |
| private void changeUpdateStatus( |
| MutableStoreProvider storeProvider, |
| IJobUpdateSummary updateSummary, |
| JobUpdateEvent event) throws UpdateStateException { |
| |
| if (updateSummary.getState().getStatus() == event.getStatus()) { |
| return; |
| } |
| |
| assertTransitionAllowed(updateSummary.getState().getStatus(), event.getStatus()); |
| recordAndChangeJobUpdateStatus(storeProvider, updateSummary.getKey(), event); |
| } |
| |
| private void recordAndChangeJobUpdateStatus( |
| MutableStoreProvider storeProvider, |
| IJobUpdateKey key, |
| JobUpdateEvent event) throws UpdateStateException { |
| |
| changeJobUpdateStatus(storeProvider, key, event, true); |
| } |
| |
| private void changeJobUpdateStatus( |
| MutableStoreProvider storeProvider, |
| IJobUpdateKey key, |
| JobUpdateEvent proposedEvent, |
| boolean record) throws UpdateStateException { |
| |
| JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore(); |
| JobUpdateStatus status = proposedEvent.getStatus(); |
| |
| LOG.info("Update {} is now in state {}", key, status); |
| if (record) { |
| updateStore.saveJobUpdateEvent( |
| key, |
| IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status))); |
| jobUpdateEventStats.getUnchecked(status).incrementAndGet(); |
| } |
| |
| if (JobUpdateStore.TERMINAL_STATES.contains(status)) { |
| pulseHandler.remove(key); |
| } else { |
| pulseHandler.updatePulseStatus(key, status); |
| } |
| |
| MonitorAction action = JobUpdateStateMachine.getActionForStatus(status); |
| IJobKey job = key.getJob(); |
| if (action == STOP_WATCHING) { |
| updates.remove(job); |
| } else if (action == ROLL_FORWARD || action == ROLL_BACK) { |
| if (action == ROLL_BACK) { |
| updates.remove(job); |
| } else { |
| checkState(!updates.containsKey(job), "Updater already exists for %s", job); |
| } |
| |
| IJobUpdate jobUpdate = updateStore.fetchJobUpdate(key).get().getUpdate(); |
| UpdateFactory.Update update; |
| try { |
| update = updateFactory.newUpdate(jobUpdate.getInstructions(), action == ROLL_FORWARD); |
| } catch (RuntimeException e) { |
| LOG.warn("Uncaught exception: " + e, e); |
| changeJobUpdateStatus( |
| storeProvider, |
| key, |
| newEvent(ERROR).setMessage("Internal scheduler error: " + e.getMessage()), |
| true); |
| return; |
| } |
| updates.put(job, update); |
| evaluateUpdater( |
| storeProvider, |
| update, |
| jobUpdate.getSummary(), |
| ImmutableMap.of()); |
| } |
| } |
| |
| private static Optional<IScheduledTask> getActiveInstance( |
| TaskStore taskStore, |
| IJobKey job, |
| int instanceId) { |
| |
| return Optional.ofNullable(Iterables.getOnlyElement( |
| taskStore.fetchTasks(Query.instanceScoped(job, instanceId).active()), null)); |
| } |
| |
| private static final Set<InstanceUpdateStatus> NOOP_INSTANCE_UPDATE = |
| ImmutableSet.of(InstanceUpdateStatus.WORKING, InstanceUpdateStatus.SUCCEEDED); |
| |
| private static boolean isCoordinatedUpdate(IJobUpdateInstructions instructions) { |
| return instructions.getSettings().getBlockIfNoPulsesAfterMs() > 0; |
| } |
| |
| private boolean isCoordinatedAndPulseExpired( |
| IJobUpdateKey key, |
| IJobUpdateInstructions instructions) { |
| |
| if (isCoordinatedUpdate(instructions)) { |
| PulseState pulseState = pulseHandler.get(key); |
| boolean result = pulseState == null || pulseState.isBlocked(clock); |
| LOG.info("Coordinated update {} pulse expired: {}", key, result); |
| return result; |
| } else { |
| return false; |
| } |
| } |
| |
| @VisibleForTesting |
| static final String PULSE_TIMEOUT_MESSAGE = "Pulses from external service have timed out."; |
| |
| private void evaluateUpdater( |
| final MutableStoreProvider storeProvider, |
| final UpdateFactory.Update update, |
| IJobUpdateSummary summary, |
| Map<Integer, Optional<IScheduledTask>> changedInstance) throws UpdateStateException { |
| |
| JobUpdateStatus updaterStatus = summary.getState().getStatus(); |
| final IJobUpdateKey key = summary.getKey(); |
| |
| JobUpdateStore.Mutable updateStore = storeProvider.getJobUpdateStore(); |
| |
| IJobUpdateInstructions instructions = updateStore.fetchJobUpdate(key).get() |
| .getUpdate().getInstructions(); |
| if (isCoordinatedAndPulseExpired(key, instructions)) { |
| // Move coordinated update into awaiting pulse state. |
| JobUpdateStatus blockedStatus = getBlockedState(summary.getState().getStatus()); |
| changeUpdateStatus( |
| storeProvider, |
| summary, |
| newEvent(blockedStatus).setMessage(PULSE_TIMEOUT_MESSAGE)); |
| return; |
| } |
| |
| InstanceStateProvider<Integer, Optional<IScheduledTask>> stateProvider = |
| instanceId -> getActiveInstance(storeProvider.getTaskStore(), key.getJob(), instanceId); |
| |
| EvaluationResult<Integer> result = update.getUpdater().evaluate(changedInstance, stateProvider); |
| |
| LOG.info(key + " evaluation result: " + result); |
| |
| final boolean autoPauseAfterBatch = |
| isAutoPauseEnabled(instructions.getSettings().getUpdateStrategy()); |
| if (autoPauseAfterBatch && maybeAutoPause(summary, result)) { |
| changeUpdateStatus(storeProvider, |
| summary, |
| newEvent(getPausedState(summary.getState().getStatus())).setMessage(UPDATE_AUTO_PAUSED)); |
| return; |
| } |
| |
| for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) { |
| Iterable<InstanceUpdateStatus> statusChanges; |
| |
| int instanceId = entry.getKey(); |
| List<IJobInstanceUpdateEvent> savedEvents = updateStore.fetchJobUpdate(key).get() |
| .getInstanceEvents() |
| .stream() |
| .filter(e -> e.getInstanceId() == instanceId) |
| .collect(Collectors.toList()); |
| |
| Set<JobUpdateAction> savedActions = |
| FluentIterable.from(savedEvents).transform(EVENT_TO_ACTION).toSet(); |
| |
| // Don't bother persisting a sequence of status changes that represents an instance that |
| // was immediately recognized as being healthy and in the desired state. |
| if (entry.getValue().getStatusChanges().equals(NOOP_INSTANCE_UPDATE) |
| && savedEvents.isEmpty()) { |
| |
| LOG.info("Suppressing no-op update for instance " + instanceId); |
| statusChanges = ImmutableSet.of(); |
| } else { |
| statusChanges = entry.getValue().getStatusChanges(); |
| } |
| |
| for (InstanceUpdateStatus statusChange : statusChanges) { |
| JobUpdateAction action = STATE_MAP.get(Pair.of(statusChange, updaterStatus)); |
| requireNonNull(action); |
| |
| // A given instance update action may only be issued once during the update lifecycle. |
| // Suppress duplicate events due to pause/resume operations. |
| if (savedActions.contains(action)) { |
| LOG.info("Suppressing duplicate update {} for instance {}.", action, instanceId); |
| } else { |
| IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build( |
| new JobInstanceUpdateEvent() |
| .setInstanceId(instanceId) |
| .setTimestampMs(clock.nowMillis()) |
| .setAction(action)); |
| updateStore.saveJobInstanceUpdateEvent(summary.getKey(), event); |
| jobUpdateActionStats.getUnchecked(action).incrementAndGet(); |
| } |
| } |
| } |
| |
| OneWayStatus status = result.getStatus(); |
| if (status == SUCCEEDED || status == OneWayStatus.FAILED) { |
| if (SideEffect.hasActions(result.getSideEffects().values())) { |
| throw new IllegalArgumentException( |
| "A terminal state should not specify actions: " + result); |
| } |
| |
| JobUpdateEvent event = new JobUpdateEvent(); |
| if (status == SUCCEEDED) { |
| event.setStatus(update.getSuccessStatus()); |
| } else { |
| event.setStatus(update.getFailureStatus()); |
| // Generate a transition message based on one (arbitrary) instance in the group that pushed |
| // the update over the failure threshold (in all likelihood this group is of size 1). |
| // This is done as a rough cut to aid in diagnosing a failed update, as generating a |
| // complete summary would likely be of dubious value. |
| for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) { |
| Optional<Failure> failure = entry.getValue().getFailure(); |
| if (failure.isPresent()) { |
| event.setMessage(failureMessage(entry.getKey(), failure.get())); |
| break; |
| } |
| } |
| } |
| |
| if (autoPauseAfterBatch) { |
| instancesSeen.remove(key); |
| } |
| changeUpdateStatus(storeProvider, summary, event); |
| } else { |
| LOG.info("Executing side-effects for update of " + key + ": " + result.getSideEffects()); |
| for (Map.Entry<Integer, SideEffect> entry : result.getSideEffects().entrySet()) { |
| IInstanceKey instance = InstanceKeys.from(key.getJob(), entry.getKey()); |
| |
| Optional<InstanceAction> action = entry.getValue().getAction(); |
| if (action.isPresent()) { |
| Optional<InstanceActionHandler> handler = action.get().getHandler(); |
| if (handler.isPresent()) { |
| Optional<Amount<Long, Time>> reevaluateDelay = handler.get().getReevaluationDelay( |
| instance, |
| instructions, |
| storeProvider, |
| stateManager, |
| updateAgentReserver, |
| updaterStatus, |
| key, |
| slaKillController); |
| if (reevaluateDelay.isPresent()) { |
| executor.schedule( |
| getDeferredEvaluator(instance, key), |
| reevaluateDelay.get().getValue(), |
| reevaluateDelay.get().getUnit().getTimeUnit()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| static final Function<IJobInstanceUpdateEvent, JobUpdateAction> EVENT_TO_ACTION = |
| IJobInstanceUpdateEvent::getAction; |
| |
| @VisibleForTesting |
| static String failureMessage(int instanceId, Failure failure) { |
| return String.format("Latest failure: instance %d %s", instanceId, failure.getReason()); |
| } |
| |
| /** |
| * Associates an instance updater state change and the job's update status to an action. |
| */ |
| private static final Map<Pair<InstanceUpdateStatus, JobUpdateStatus>, JobUpdateAction> STATE_MAP = |
| ImmutableMap.<Pair<InstanceUpdateStatus, JobUpdateStatus>, JobUpdateAction>builder() |
| .put( |
| Pair.of(InstanceUpdateStatus.WORKING, ROLLING_FORWARD), |
| JobUpdateAction.INSTANCE_UPDATING) |
| .put( |
| Pair.of(InstanceUpdateStatus.SUCCEEDED, ROLLING_FORWARD), |
| JobUpdateAction.INSTANCE_UPDATED) |
| .put( |
| Pair.of(InstanceUpdateStatus.FAILED, ROLLING_FORWARD), |
| JobUpdateAction.INSTANCE_UPDATE_FAILED) |
| .put( |
| Pair.of(InstanceUpdateStatus.WORKING, ROLLING_BACK), |
| JobUpdateAction.INSTANCE_ROLLING_BACK) |
| .put( |
| Pair.of(InstanceUpdateStatus.SUCCEEDED, ROLLING_BACK), |
| JobUpdateAction.INSTANCE_ROLLED_BACK) |
| .put( |
| Pair.of(InstanceUpdateStatus.FAILED, ROLLING_BACK), |
| JobUpdateAction.INSTANCE_ROLLBACK_FAILED) |
| .build(); |
| |
| private static JobUpdateEvent newEvent(JobUpdateStatus status) { |
| return new JobUpdateEvent().setStatus(status); |
| } |
| |
| private static JobUpdateEvent addAuditData(JobUpdateEvent event, AuditData auditData) { |
| return event.setMessage(auditData.getMessage().orElse(null)) |
| .setUser(auditData.getUser()); |
| } |
| |
| private Runnable getDeferredEvaluator(final IInstanceKey instance, final IJobUpdateKey key) { |
| return shutdownOnError( |
| lifecycle, |
| LOG, |
| String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance), |
| () -> storage.write((NoResult.Quiet) storeProvider -> { |
| IJobUpdateSummary summary = |
| storeProvider.getJobUpdateStore().fetchJobUpdate(key).get().getUpdate().getSummary(); |
| JobUpdateStatus status = summary.getState().getStatus(); |
| // Suppress this evaluation if the updater is not currently active. |
| if (JobUpdateStateMachine.isActive(status)) { |
| UpdateFactory.Update update = updates.get(instance.getJobKey()); |
| try { |
| evaluateUpdater( |
| storeProvider, |
| update, |
| summary, |
| ImmutableMap.of( |
| instance.getInstanceId(), |
| getActiveInstance( |
| storeProvider.getTaskStore(), |
| instance.getJobKey(), |
| instance.getInstanceId()))); |
| } catch (UpdateStateException e) { |
| LOG.error(String.format("Error running deferred evaluation for %s: %s", instance, e)); |
| throw new RuntimeException(e); |
| } |
| } |
| })); |
| } |
| |
| @VisibleForTesting |
| private boolean maybeAutoPause(IJobUpdateSummary summary, EvaluationResult<Integer> result) { |
| JobUpdateStatus updaterStatus = summary.getState().getStatus(); |
| final IJobUpdateKey key = summary.getKey(); |
| |
| // Only apply auto-pause to an update rolling forward. |
| if (updaterStatus != ROLLING_FORWARD) { |
| return false; |
| } |
| |
| if (instancesSeen.containsKey(key)) { |
| Set<Integer> instancesCached = instancesSeen.get(key); |
| Set<Integer> instancesBeingUpdated = result.getSideEffects().keySet(); |
| |
| // On the final batch, pause for acknowledgement and remove the cache of instances. |
| // This will cause the else branch to get run on resume and the update will finish. |
| if (result.getStatus() == SUCCEEDED) { |
| instancesSeen.remove(key); |
| return true; |
| } |
| |
| // If the update evaluation is dealing with new instances, that signals we are at a barrier |
| // crossing. |
| if (!instancesCached.containsAll(instancesBeingUpdated)) { |
| instancesCached.addAll(instancesBeingUpdated); |
| return true; |
| } |
| } else { |
| instancesSeen.put(key, new HashSet<Integer>(result.getSideEffects().keySet())); |
| } |
| return false; |
| } |
| |
| private static class PulseHandler { |
| private final Clock clock; |
| |
| // TODO(maxim): expose this data via a debug endpoint AURORA-1103. |
| // Currently active coordinated update pulse states. A pulse state is added when a coordinated |
| // update is created and removed only when an update reaches terminal state. A PAUSED update |
| // pulse state is still retained in the map and accepts pulses. |
| private final Map<IJobUpdateKey, PulseState> pulseStates = Maps.newHashMap(); |
| |
| PulseHandler(Clock clock) { |
| this.clock = requireNonNull(clock); |
| } |
| |
| synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status, long ts) { |
| pulseStates.put(update.getSummary().getKey(), new PulseState( |
| status, |
| update.getInstructions().getSettings().getBlockIfNoPulsesAfterMs(), |
| ts)); |
| } |
| |
| synchronized PulseState pulseAndGet(IJobUpdateKey key) { |
| PulseState state = pulseStates.get(key); |
| if (state != null) { |
| state = pulseStates.put(key, new PulseState( |
| state.getStatus(), |
| state.getPulseTimeoutMs(), |
| clock.nowMillis())); |
| } |
| return state; |
| } |
| |
| synchronized void updatePulseStatus(IJobUpdateKey key, JobUpdateStatus status) { |
| PulseState state = pulseStates.get(key); |
| if (state != null) { |
| pulseStates.put(key, new PulseState( |
| status, |
| state.getPulseTimeoutMs(), |
| state.getLastPulseMs())); |
| } |
| } |
| |
| synchronized void remove(IJobUpdateKey key) { |
| pulseStates.remove(key); |
| } |
| |
| synchronized PulseState get(IJobUpdateKey key) { |
| return pulseStates.get(key); |
| } |
| } |
| |
| private static class PulseState { |
| private final JobUpdateStatus status; |
| private final long pulseTimeoutMs; |
| private final long lastPulseMs; |
| |
| PulseState(JobUpdateStatus status, long pulseTimeoutMs, long lastPulseMs) { |
| this.status = requireNonNull(status); |
| this.pulseTimeoutMs = pulseTimeoutMs; |
| this.lastPulseMs = lastPulseMs; |
| } |
| |
| JobUpdateStatus getStatus() { |
| return status; |
| } |
| |
| long getPulseTimeoutMs() { |
| return pulseTimeoutMs; |
| } |
| |
| long getLastPulseMs() { |
| return lastPulseMs; |
| } |
| |
| boolean isBlocked(Clock clock) { |
| return clock.nowMillis() - lastPulseMs >= pulseTimeoutMs; |
| } |
| } |
| } |