| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.daemon.supervisor; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.storm.Config; |
| import org.apache.storm.DaemonConfig; |
| import org.apache.storm.cluster.IStormClusterState; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.ExecutorInfo; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.generated.LSWorkerHeartbeat; |
| import org.apache.storm.generated.LocalAssignment; |
| import org.apache.storm.generated.ProfileAction; |
| import org.apache.storm.generated.ProfileRequest; |
| import org.apache.storm.generated.WorkerResources; |
| import org.apache.storm.localizer.AsyncLocalizer; |
| import org.apache.storm.localizer.BlobChangingCallback; |
| import org.apache.storm.localizer.GoodToGo; |
| import org.apache.storm.localizer.LocallyCachedBlob; |
| import org.apache.storm.metricstore.WorkerMetricsProcessor; |
| import org.apache.storm.scheduler.ISupervisor; |
| import org.apache.storm.utils.EnumUtil; |
| import org.apache.storm.utils.LocalState; |
| import org.apache.storm.utils.ObjectReader; |
| import org.apache.storm.utils.Time; |
| import org.apache.storm.utils.Utils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class Slot extends Thread implements AutoCloseable, BlobChangingCallback { |
| private static final Logger LOG = LoggerFactory.getLogger(Slot.class); |
| |
| enum KillReason { |
| ASSIGNMENT_CHANGED, BLOB_CHANGED, PROCESS_EXIT, MEMORY_VIOLATION, HB_TIMEOUT, HB_NULL; |
| |
| @Override |
| public String toString() { |
| return EnumUtil.toMetricName(this); |
| } |
| |
| } |
| |
| private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); |
| private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>(); |
| private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>()); |
| |
| private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>(); |
| private final StaticState staticState; |
| private final IStormClusterState clusterState; |
| private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments; |
| private final OnlyLatestExecutor<Integer> metricsExec; |
| private volatile boolean done = false; |
| private volatile DynamicState dynamicState; |
| |
| public Slot(AsyncLocalizer localizer, Map<String, Object> conf, |
| ContainerLauncher containerLauncher, String host, |
| int port, LocalState localState, |
| IStormClusterState clusterState, |
| ISupervisor supervisor, |
| AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments, |
| OnlyLatestExecutor<Integer> metricsExec, |
| WorkerMetricsProcessor metricsProcessor, |
| SlotMetrics slotMetrics) throws Exception { |
| super("SLOT_" + port); |
| this.metricsExec = metricsExec; |
| this.cachedCurrentAssignments = cachedCurrentAssignments; |
| this.clusterState = clusterState; |
| this.staticState = new StaticState(localizer, |
| ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000, |
| ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000, |
| ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000, |
| ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000, |
| containerLauncher, |
| host, |
| port, |
| supervisor, |
| localState, |
| this, |
| metricsExec, metricsProcessor, slotMetrics); |
| |
| LocalAssignment currentAssignment = null; |
| Container container = null; |
| LocalAssignment newAssignment = null; |
| |
| Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap(); |
| if (assignments != null) { |
| currentAssignment = assignments.get(port); |
| if (currentAssignment != null) { |
| try { |
| // For now we do not make a transaction when removing a topology assignment from local, an overdue |
| // assignment may be left on local disk. |
| // So we should check if the local disk assignment is valid when initializing: |
| // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out; |
| // if topology files exist but the topology id is invalid, just let Supervisor make a sync; |
| // if topology files exist and topology files is valid, recover the container. |
| if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) { |
| container = containerLauncher.recoverContainer(port, currentAssignment, localState); |
| } else { |
| // Make the assignment null to let slot clean up the disk assignment. |
| currentAssignment = null; |
| } |
| } catch (ContainerRecoveryException e) { |
| //We could not recover container will be null. |
| currentAssignment = null; |
| } |
| |
| newAssignment = currentAssignment; |
| } |
| } |
| |
| setNewAssignment(newAssignment); |
| //if the current assignment is already running, new assignment will never be promoted to currAssignment, |
| // because Timer is not being compared in #equals or #equivalent, meaning newAssignment always equals to currAssignment. |
| // Therefore the timer in newAssignment won't be invoked |
| this.dynamicState = new DynamicState(currentAssignment, container, this.newAssignment.get(), slotMetrics); |
| if (MachineState.RUNNING == dynamicState.state) { |
| //We are running so we should recover the blobs. |
| staticState.localizer.recoverRunningTopology(currentAssignment, port, this); |
| saveNewAssignment(currentAssignment); |
| } |
| LOG.info("SLOT {}:{} Starting in state {} - assignment {}", staticState.host, staticState.port, dynamicState.state, |
| dynamicState.currentAssignment); |
| } |
| |
| //In some cases the new LocalAssignment may be equivalent to the old, but |
| // It is not equal. In those cases we want to update the current assignment to |
| // be the same as the new assignment |
| //PRECONDITION: The new and current assignments must be equivalent |
| private static DynamicState updateAssignmentIfNeeded(DynamicState dynamicState) { |
| assert equivalent(dynamicState.newAssignment, dynamicState.currentAssignment); |
| if (dynamicState.newAssignment != null |
| && !dynamicState.newAssignment.equals(dynamicState.currentAssignment)) { |
| dynamicState = |
| dynamicState.withCurrentAssignment(dynamicState.container, dynamicState.newAssignment); |
| } |
| return dynamicState; |
| } |
| |
| @VisibleForTesting |
| static boolean forSameTopology(LocalAssignment a, LocalAssignment b) { |
| if (a == null && b == null) { |
| return true; |
| } |
| if (a != null && b != null) { |
| if (a.get_topology_id().equals(b.get_topology_id())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * This method compares WorkerResources while considering any resources are NULL to be 0.0 |
| * |
| * @param first WorkerResources A |
| * @param second WorkerResources B |
| * @return True if A and B are equivalent, treating the absent resources as 0.0 |
| */ |
| @VisibleForTesting |
| static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) { |
| if (first == null) { |
| return false; |
| } |
| if (first == second) { |
| return true; |
| } |
| if (first.equals(second)) { |
| return true; |
| } |
| |
| if (first.get_cpu() != second.get_cpu()) { |
| return false; |
| } |
| if (first.get_mem_on_heap() != second.get_mem_on_heap()) { |
| return false; |
| } |
| if (first.get_mem_off_heap() != second.get_mem_off_heap()) { |
| return false; |
| } |
| if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) { |
| return false; |
| } |
| if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) { |
| return false; |
| } |
| if (!customResourceMapEquality(first.get_resources(), second.get_resources())) { |
| return false; |
| } |
| if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * This method compares Resource Maps while considering any resources are NULL to be 0.0 |
| * |
| * @param firstMap Resource Map A |
| * @param secondMap Resource Map B |
| * @return True if A and B are equivalent, treating the absent resources as 0.0 |
| */ |
| private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) { |
| if (firstMap == null && secondMap == null) { |
| return true; |
| } |
| if (firstMap == null) { |
| firstMap = new HashMap<>(); |
| } |
| if (secondMap == null) { |
| secondMap = new HashMap<>(); |
| } |
| |
| Set<String> keys = new HashSet<>(firstMap.keySet()); |
| keys.addAll(secondMap.keySet()); |
| for (String key : keys) { |
| if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method. |
| * |
| * @param first Local assignment A |
| * @param second Local assignment B |
| * @return True if A and B are equivalent, ignoring the order of the executors |
| */ |
| @VisibleForTesting |
| static boolean equivalent(LocalAssignment first, LocalAssignment second) { |
| if (first == null && second == null) { |
| return true; |
| } |
| if (first != null && second != null) { |
| if (first.get_topology_id().equals(second.get_topology_id())) { |
| Set<ExecutorInfo> aexec = new HashSet<>(first.get_executors()); |
| Set<ExecutorInfo> bexec = new HashSet<>(second.get_executors()); |
| if (aexec.equals(bexec)) { |
| boolean firstHasResources = first.is_set_resources(); |
| boolean secondHasResources = second.is_set_resources(); |
| if (!firstHasResources && !secondHasResources) { |
| return true; |
| } |
| if (firstHasResources && secondHasResources) { |
| WorkerResources firstResources = first.get_resources(); |
| WorkerResources secondResources = second.get_resources(); |
| return customWorkerResourcesEquality(firstResources, secondResources); |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception { |
| LOG.debug("STATE {}", dynamicState.state); |
| switch (dynamicState.state) { |
| case EMPTY: |
| return handleEmpty(dynamicState, staticState); |
| case RUNNING: |
| return handleRunning(dynamicState, staticState); |
| case WAITING_FOR_WORKER_START: |
| return handleWaitingForWorkerStart(dynamicState, staticState); |
| case KILL_BLOB_UPDATE: |
| return handleKillBlobUpdate(dynamicState, staticState); |
| case KILL_AND_RELAUNCH: |
| return handleKillAndRelaunch(dynamicState, staticState); |
| case KILL: |
| return handleKill(dynamicState, staticState); |
| case WAITING_FOR_BLOB_LOCALIZATION: |
| return handleWaitingForBlobLocalization(dynamicState, staticState); |
| case WAITING_FOR_BLOB_UPDATE: |
| return handleWaitingForBlobUpdate(dynamicState, staticState); |
| default: |
| throw new IllegalStateException("Code not ready to handle a state of " + dynamicState.state); |
| } |
| } |
| |
| /** |
| * Prepare for a new assignment by downloading new required blobs, or going to empty if there is nothing to download. |
| * PRECONDITION: The slot should be empty |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws IOException on any error |
| */ |
| private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, |
| StaticState staticState) throws IOException { |
| assert (dynamicState.container == null); |
| assert dynamicState.currentAssignment == null; |
| |
| //We're either going to empty, or starting fresh blob download. Either way, the changing blob notifications are outdated. |
| dynamicState = drainAllChangingBlobs(dynamicState); |
| if (dynamicState.newAssignment == null) { |
| return dynamicState.withState(MachineState.EMPTY); |
| } |
| Future<Void> pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, |
| staticState.port, staticState.changingCallback); |
| return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload) |
| .withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION); |
| } |
| |
| private static DynamicState killContainerFor(KillReason reason, DynamicState dynamicState, StaticState staticState) |
| throws Exception { |
| assert dynamicState.container != null; |
| |
| //Skip special case if `storm kill_workers` is already invoked |
| Boolean isDead = dynamicState.container.areAllProcessesDead(); |
| if (!isDead) { |
| if (reason == KillReason.ASSIGNMENT_CHANGED || reason == KillReason.BLOB_CHANGED) { |
| staticState.supervisor.killedWorker(staticState.port); |
| } |
| dynamicState.container.kill(); |
| } |
| staticState.slotMetrics.numWorkersKilledFor.get(reason).mark(); |
| |
| DynamicState next; |
| switch (reason) { |
| case ASSIGNMENT_CHANGED: |
| Future<Void> pendingDownload = null; |
| if (dynamicState.newAssignment != null) { |
| pendingDownload = staticState.localizer.requestDownloadTopologyBlobs( |
| dynamicState.newAssignment, staticState.port, staticState.changingCallback); |
| } |
| next = dynamicState.withState(MachineState.KILL) |
| .withPendingLocalization(dynamicState.newAssignment, pendingDownload); |
| break; |
| |
| case BLOB_CHANGED: |
| next = dynamicState.withState(MachineState.KILL_BLOB_UPDATE); |
| break; |
| |
| case PROCESS_EXIT: |
| case MEMORY_VIOLATION: |
| case HB_TIMEOUT: |
| case HB_NULL: |
| //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again. |
| HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions); |
| mod.addAll(dynamicState.pendingStopProfileActions); |
| next = dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet()); |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown reason for killing a container"); |
| } |
| |
| if (!isDead) { |
| Time.sleep(staticState.killSleepMs); |
| } |
| return next; |
| } |
| |
| /** |
| * Clean up a container. |
| * PRECONDITION: All of the processes have died. |
| * @param dynamicState current state |
| * @param staticState static data |
| * @param nextState the next MachineState to go to. |
| * @return the next state. |
| */ |
| private static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws |
| Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert (dynamicState.container.areAllProcessesDead()); |
| |
| dynamicState.container.cleanUp(); |
| staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); |
| DynamicState ret = dynamicState.withCurrentAssignment(null, null); |
| if (nextState != null) { |
| ret = ret.withState(nextState); |
| } |
| return ret; |
| } |
| |
| /** |
| * Drop all of the changingBlobs and pendingChangingBlobs. |
| * |
| * <p>PRECONDITION: container is null |
| * |
| * @param dynamicState current state. |
| * @return the next state. |
| */ |
| private static DynamicState drainAllChangingBlobs(DynamicState dynamicState) { |
| assert dynamicState.container == null; |
| if (!dynamicState.changingBlobs.isEmpty()) { |
| for (BlobChanging rc : dynamicState.changingBlobs) { |
| rc.latch.countDown(); |
| } |
| dynamicState = dynamicState.withChangingBlobs(Collections.emptySet()); |
| } |
| |
| if (!dynamicState.pendingChangingBlobs.isEmpty()) { |
| dynamicState = dynamicState.withPendingChangingBlobs(Collections.emptySet(), null); |
| } |
| |
| return dynamicState; |
| } |
| |
| /** |
| * Informs the async localizer for all of blobs that the worker acknowledged the change of blobs. |
| * Worker has stop as of now. |
| * |
| * <p>PRECONDITION: container is null |
| * PRECONDITION: changingBlobs should only be for the given assignment. |
| * |
| * @param dynamicState the current state |
| * @return the futures for the current assignment. |
| */ |
| private static DynamicState informChangedBlobs(DynamicState dynamicState, LocalAssignment assignment) { |
| assert dynamicState.container == null; |
| assert dynamicState.changingBlobs.stream().allMatch((cr) -> forSameTopology(cr.assignment, assignment)); |
| |
| Set<Future<Void>> futures = new HashSet<>(dynamicState.changingBlobs.size()); |
| |
| // We need to add the new futures to the existing ones |
| if (forSameTopology(dynamicState.pendingChangingBlobsAssignment, assignment)) { |
| futures.addAll(dynamicState.pendingChangingBlobs); |
| } |
| |
| // Acknowledge all changing blobs as futures |
| for (BlobChanging rc : dynamicState.changingBlobs) { |
| futures.add(rc.latch.countDown()); |
| } |
| |
| LOG.debug("found changing blobs {} moving them to pending...", dynamicState.changingBlobs); |
| return dynamicState.withChangingBlobs(Collections.emptySet()) |
| .withPendingChangingBlobs(futures, assignment); |
| } |
| |
| /** |
| * Filter all of the changing blobs to just be for those compatible with the given assignment. |
| * All others will be released appropriately. |
| * |
| * @param dynamicState the current state |
| * @param assignment the assignment to look for |
| * @return the updated dynamicState |
| */ |
| private static DynamicState filterChangingBlobsFor(DynamicState dynamicState, final LocalAssignment assignment) { |
| if (dynamicState.changingBlobs.isEmpty()) { |
| return dynamicState; |
| } |
| |
| HashSet<BlobChanging> savedBlobs = new HashSet<>(dynamicState.changingBlobs.size()); |
| for (BlobChanging rc : dynamicState.changingBlobs) { |
| if (forSameTopology(assignment, rc.assignment)) { |
| savedBlobs.add(rc); |
| } else { |
| rc.latch.countDown(); |
| } |
| } |
| return dynamicState.withChangingBlobs(savedBlobs); |
| } |
| |
| /** |
| * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state, when the slot is waiting for |
| * blobs of the pending assignment to be completely downloaded, before the container is launched/relaunched. |
| * PRECONDITION: neither pendingLocalization nor pendingDownload is null. |
| * PRECONDITION: The slot should be empty |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.pendingLocalization != null); |
| assert (dynamicState.pendingDownload != null); |
| assert (dynamicState.container == null); |
| assert dynamicState.currentAssignment == null; |
| |
| //Ignore changes to scheduling while downloading the topology blobs |
| // We don't support canceling the download through the future yet, |
| // because pending blobs may be shared by multiple workers and cancel it |
| // may lead to race condition |
| // To keep everything in sync, just wait for all workers |
| try { |
| //Release things that don't need to wait for us to finish downloading. |
| dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingLocalization); |
| if (!dynamicState.changingBlobs.isEmpty()) { |
| //Unblock downloading by accepting the futures. |
| dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingLocalization); |
| } |
| |
| if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) { |
| //Scheduling changed |
| staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port); |
| // Switch to the new assignment even if localization hasn't completed, or go to empty state |
| // if no new assignment. |
| return prepareForNewAssignmentNoWorkersRunning(dynamicState |
| .withPendingLocalization(null, null), |
| staticState); |
| } |
| |
| // Wait until time out |
| dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS); |
| //Downloading of all blobs finished. This is the precondition for all codes below. |
| if (!dynamicState.pendingChangingBlobs.isEmpty()) { |
| LOG.info("There are pending changes, waiting for them to finish before launching container..."); |
| //We cannot launch the container yet the resources may still be updating |
| return dynamicState.withState(MachineState.WAITING_FOR_BLOB_UPDATE) |
| .withPendingLocalization(null, null); |
| } |
| |
| staticState.slotMetrics.numWorkersLaunched.mark(); |
| Container c = |
| staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState); |
| return dynamicState |
| .withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START) |
| .withPendingLocalization(null, null); |
| } catch (TimeoutException e) { |
| //We waited for 1 second loop around and try again.... |
| return dynamicState; |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof AuthorizationException) { |
| LOG.error("{}", ((AuthorizationException) e.getCause()).get_msg()); |
| } else if (e.getCause() instanceof KeyNotFoundException) { |
| LOG.error("{}", ((KeyNotFoundException) e.getCause()).get_msg()); |
| } else { |
| LOG.error("{}", e.getCause().getMessage()); |
| } |
| // release the reference on all blobs associated with this topology. |
| staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port); |
| // we wait for 3 seconds |
| Time.sleepSecs(3); |
| //Try again, or go to empty if assignment has been nulled |
| return prepareForNewAssignmentNoWorkersRunning(dynamicState |
| .withPendingLocalization(null, null), |
| staticState); |
| } |
| } |
| |
| /** |
| * State Transitions for WAITING_FOR_BLOB_UPDATE state. |
| * |
| * <p>PRECONDITION: container is null |
| * PRECONDITION: pendingChangingBlobs is not empty (otherwise why did we go to this state) |
| * PRECONDITION: pendingChangingBlobsAssignment is not null. |
| * |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleWaitingForBlobUpdate(DynamicState dynamicState, StaticState staticState) |
| throws Exception { |
| assert dynamicState.container == null; |
| assert dynamicState.pendingChangingBlobsAssignment != null; |
| assert !dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingDownload == null; |
| assert dynamicState.pendingLocalization == null; |
| |
| if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| //We were rescheduled while waiting for the resources to be updated, |
| // but the container is already not running. |
| LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, |
| dynamicState.currentAssignment, dynamicState.newAssignment); |
| if (dynamicState.currentAssignment != null) { |
| staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); |
| } |
| staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment, staticState.port); |
| return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null), |
| staticState); |
| } |
| |
| dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingChangingBlobsAssignment); |
| if (!dynamicState.changingBlobs.isEmpty()) { |
| dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingChangingBlobsAssignment); |
| } |
| |
| //We only have a set amount of time we can wait for before looping around again |
| long start = Time.nanoTime(); |
| try { |
| for (Future<Void> pending : dynamicState.pendingChangingBlobs) { |
| long now = Time.nanoTime(); |
| long timeLeft = ONE_SEC_IN_NANO - (now - start); |
| if (timeLeft <= 0) { |
| throw new TimeoutException(); |
| } |
| pending.get(timeLeft, TimeUnit.NANOSECONDS); |
| } |
| //All done we can launch the worker now |
| Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingChangingBlobsAssignment, |
| staticState.localState); |
| return dynamicState |
| .withCurrentAssignment(c, dynamicState.pendingChangingBlobsAssignment).withState(MachineState.WAITING_FOR_WORKER_START) |
| .withPendingChangingBlobs(Collections.emptySet(), null); |
| } catch (TimeoutException ex) { |
| return dynamicState; |
| } |
| } |
| |
| /** |
| * State Transitions for KILL state. |
| * PRECONDITION: container.kill() was called |
| * PRECONDITION: container != null && currentAssignment != null |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| |
| if (dynamicState.container.areAllProcessesDead()) { |
| LOG.info("SLOT {} all processes are dead...", staticState.port); |
| return cleanupCurrentContainer(dynamicState, |
| staticState, |
| dynamicState.pendingLocalization == null |
| ? MachineState.EMPTY |
| : MachineState.WAITING_FOR_BLOB_LOCALIZATION); |
| } |
| |
| LOG.warn("SLOT {} force kill and wait...", staticState.port); |
| dynamicState.container.forceKill(); |
| Time.sleep(staticState.killSleepMs); |
| return dynamicState; |
| } |
| |
| /** |
| * State Transitions for KILL_AND_RELAUNCH state. |
| * PRECONDITION: container.kill() was called |
| * PRECONDITION: container != null && currentAssignment != null |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| assert dynamicState.pendingLocalization == null; |
| assert dynamicState.pendingDownload == null; |
| |
| if (dynamicState.container.areAllProcessesDead()) { |
| if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| dynamicState.container.cleanUpForRestart(); |
| dynamicState.container.relaunch(); |
| return dynamicState.withState(MachineState.WAITING_FOR_WORKER_START); |
| } |
| //Scheduling changed after we killed all of the processes |
| return prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, staticState, null), staticState); |
| } |
| //The child processes typically exit in < 1 sec. If 2 mins later they are still around something is wrong |
| if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) { |
| throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds"); |
| } |
| dynamicState.container.forceKill(); |
| Time.sleep(staticState.killSleepMs); |
| return dynamicState; |
| } |
| |
| /** |
| * State Transitions for KILL_BLOB_UPDATE state. |
| * PRECONDITION: container.kill() was called |
| * PRECONDITION: container != null && currentAssignment != null |
| * |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleKillBlobUpdate(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| assert dynamicState.pendingDownload == null; |
| assert dynamicState.pendingLocalization == null; |
| |
| //Release things that don't need to wait for us |
| dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment); |
| |
| if (dynamicState.container.areAllProcessesDead()) { |
| if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| dynamicState.container.cleanUp(); |
| dynamicState = dynamicState.withCurrentAssignment(null, dynamicState.currentAssignment); |
| return informChangedBlobs(dynamicState, dynamicState.currentAssignment) |
| .withState(MachineState.WAITING_FOR_BLOB_UPDATE); |
| } |
| //Scheduling changed after we killed all of the processes |
| return prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, staticState, null), staticState); |
| } |
| //The child processes typically exit in < 1 sec. If 2 mins later they are still around something is wrong |
| if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) { |
| throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds"); |
| } |
| dynamicState.container.forceKill(); |
| Time.sleep(staticState.killSleepMs); |
| return dynamicState; |
| } |
| |
| /** |
| * State Transitions for WAITING_FOR_WORKER_START state. |
| * PRECONDITION: container != null && currentAssignment != null |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| assert dynamicState.pendingDownload == null; |
| assert dynamicState.pendingLocalization == null; |
| |
| LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); |
| if (hb != null) { |
| long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; |
| long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); |
| if (hbAgeMs <= hbTimeoutMs) { |
| return dynamicState.withState(MachineState.RUNNING); |
| } |
| } |
| |
| if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| //We were rescheduled while waiting for the worker to come up |
| LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, |
| dynamicState.newAssignment); |
| return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState); |
| } |
| dynamicState = updateAssignmentIfNeeded(dynamicState); |
| |
| long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime); |
| long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState); |
| if (timeDiffms > hbFirstTimeoutMs) { |
| staticState.slotMetrics.numWorkerStartTimedOut.mark(); |
| LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container, |
| hbFirstTimeoutMs); |
| return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState); |
| } |
| |
| dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment); |
| if (!dynamicState.changingBlobs.isEmpty()) { |
| //Kill the container and restart it |
| return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState); |
| } |
| Time.sleep(1000); |
| return dynamicState; |
| } |
| |
| /** |
| * State Transitions for RUNNING state. |
| * PRECONDITION: container != null && currentAssignment != null |
| * @param dynamicState current state |
| * @param staticState static data |
| * @return the next state |
| * @throws Exception on any error |
| */ |
| private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception { |
| assert (dynamicState.container != null); |
| assert (dynamicState.currentAssignment != null); |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| assert dynamicState.pendingDownload == null; |
| assert dynamicState.pendingLocalization == null; |
| |
| if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, |
| dynamicState.newAssignment); |
| //Scheduling changed while running... |
| return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState); |
| } |
| dynamicState = updateAssignmentIfNeeded(dynamicState); |
| |
| dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment); |
| if (!dynamicState.changingBlobs.isEmpty()) { |
| //Kill the container and restart it |
| return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState); |
| } |
| |
| if (dynamicState.container.didMainProcessExit()) { |
| LOG.warn("SLOT {}: main process has exited", staticState.port); |
| return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState); |
| } |
| |
| if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) { |
| LOG.warn("SLOT {}: violated memory limits", staticState.port); |
| return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState); |
| } |
| |
| LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); |
| if (hb == null) { |
| LOG.warn("SLOT {}: HB returned as null", staticState.port); |
| //This can happen if the supervisor crashed after launching a |
| // worker that never came up. |
| return killContainerFor(KillReason.HB_NULL, dynamicState, staticState); |
| } |
| |
| long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; |
| long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); |
| if (timeDiffMs > hbTimeoutMs) { |
| LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs); |
| return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState); |
| } |
| |
| //The worker is up and running check for profiling requests |
| if (!dynamicState.profileActions.isEmpty()) { |
| HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions); |
| HashSet<TopoProfileAction> modPending = new HashSet<>(dynamicState.pendingStopProfileActions); |
| Iterator<TopoProfileAction> iter = mod.iterator(); |
| while (iter.hasNext()) { |
| TopoProfileAction action = iter.next(); |
| if (!action.topoId.equals(dynamicState.currentAssignment.get_topology_id())) { |
| iter.remove(); |
| LOG.warn("Dropping {} wrong topology is running", action); |
| //Not for this topology so skip it |
| } else { |
| if (modPending.contains(action)) { |
| boolean isTimeForStop = Time.currentTimeMillis() > action.request.get_time_stamp(); |
| if (isTimeForStop) { |
| if (dynamicState.container.runProfiling(action.request, true)) { |
| LOG.debug("Stopped {} action finished", action); |
| iter.remove(); |
| modPending.remove(action); |
| } else { |
| LOG.warn("Stopping {} failed, will be retried", action); |
| } |
| } else { |
| LOG.debug("Still pending {} now: {}", action, Time.currentTimeMillis()); |
| } |
| } else { |
| //J_PROFILE_START is not used. When you see a J_PROFILE_STOP |
| // start profiling and save it away to stop when timeout happens |
| if (action.request.get_action() == ProfileAction.JPROFILE_STOP) { |
| if (dynamicState.container.runProfiling(action.request, false)) { |
| modPending.add(action); |
| LOG.debug("Started {} now: {}", action, Time.currentTimeMillis()); |
| } else { |
| LOG.warn("Starting {} failed, will be retried", action); |
| } |
| } else { |
| if (dynamicState.container.runProfiling(action.request, false)) { |
| LOG.debug("Started {} action finished", action); |
| iter.remove(); |
| } else { |
| LOG.warn("Starting {} failed, will be retried", action); |
| } |
| } |
| } |
| } |
| } |
| dynamicState = dynamicState.withProfileActions(mod, modPending); |
| } |
| |
| dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor); |
| |
| Time.sleep(staticState.monitorFreqMs); |
| return dynamicState; |
| } |
| |
| static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException { |
| assert dynamicState.currentAssignment == null; |
| assert dynamicState.container == null; |
| assert dynamicState.pendingChangingBlobs.isEmpty(); |
| assert dynamicState.pendingChangingBlobsAssignment == null; |
| assert dynamicState.pendingDownload == null; |
| assert dynamicState.pendingLocalization == null; |
| if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { |
| return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState); |
| } |
| dynamicState = updateAssignmentIfNeeded(dynamicState); |
| |
| //Both assignments are null, just wait |
| if (dynamicState.profileActions != null && !dynamicState.profileActions.isEmpty()) { |
| //Nothing is scheduled here so throw away all of the profileActions |
| LOG.warn("Dropping {} no topology is running", dynamicState.profileActions); |
| dynamicState = dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet()); |
| } |
| //Drop the change notifications we are not running anything right now |
| dynamicState = drainAllChangingBlobs(dynamicState); |
| Time.sleep(1000); |
| return dynamicState; |
| } |
| |
| MachineState getMachineState() { |
| return dynamicState.state; |
| } |
| |
| /* |
| * Get worker heartbeat timeout time in ms. Use topology specified timeout if provided. |
| */ |
| private static long getHbTimeoutMs(StaticState staticState, DynamicState dynamicState) { |
| long hbTimeoutMs = staticState.hbTimeoutMs; |
| Map<String, Object> topoConf = dynamicState.container.topoConf; |
| |
| if (topoConf != null && topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) { |
| long topoHbTimeoutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) * 1000; |
| topoHbTimeoutMs = Math.max(topoHbTimeoutMs, hbTimeoutMs); |
| hbTimeoutMs = topoHbTimeoutMs; |
| } |
| |
| return hbTimeoutMs; |
| } |
| |
| /* |
| * Get worker heartbeat timeout when waiting for worker to start. |
| * If topology specific timeout if set, ensure first heartbeat timeout >= topology specific timeout. |
| */ |
| private static long getFirstHbTimeoutMs(StaticState staticState, DynamicState dynamicState) { |
| return Math.max(getHbTimeoutMs(staticState, dynamicState), staticState.firstHbTimeoutMs); |
| } |
| |
| /** |
| * Set a new assignment asynchronously. |
| * @param newAssignment the new assignment for this slot to run, null to run nothing |
| */ |
| public final void setNewAssignment(LocalAssignment newAssignment) { |
| this.newAssignment.set(newAssignment == null |
| ? null |
| : new TimerDecoratedAssignment(newAssignment, staticState.slotMetrics.workerLaunchDuration)); |
| } |
| |
| @Override |
| public void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob blob, GoodToGo go) { |
| assert port == staticState.port : "got a callback that is not for us " + port + " != " + staticState.port; |
| //This is called async so lets assume that it is something we care about |
| try { |
| changingBlobs.put(new BlobChanging(assignment, blob, go.getLatch())); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("This should not have happened, but it did (the queue is unbounded)", e); |
| } |
| } |
| |
| public void addProfilerActions(Set<TopoProfileAction> actions) { |
| if (actions != null) { |
| while (true) { |
| Set<TopoProfileAction> orig = profiling.get(); |
| Set<TopoProfileAction> newActions = new HashSet<>(orig); |
| newActions.addAll(actions); |
| if (profiling.compareAndSet(orig, newActions)) { |
| return; |
| } |
| } |
| } |
| } |
| |
| /** |
| * get the workerID (nullable) from CURRENT container, if existed, or return null. |
| * @return workerID |
| */ |
| public String getWorkerId() { |
| String workerId = null; |
| Container c = dynamicState.container; |
| if (c != null) { |
| workerId = c.getWorkerId(); |
| } |
| return workerId; |
| } |
| |
| private void saveNewAssignment(LocalAssignment assignment) { |
| synchronized (staticState.localState) { |
| Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap(); |
| if (assignments == null) { |
| assignments = new HashMap<>(); |
| } |
| if (assignment == null) { |
| assignments.remove(staticState.port); |
| } else { |
| assignments.put(staticState.port, assignment); |
| } |
| staticState.localState.setLocalAssignmentsMap(assignments); |
| } |
| Map<Long, LocalAssignment> update = null; |
| Map<Long, LocalAssignment> orig = null; |
| do { |
| Long lport = new Long(staticState.port); |
| orig = cachedCurrentAssignments.get(); |
| update = new HashMap<>(orig); |
| if (assignment == null) { |
| update.remove(lport); |
| } else { |
| update.put(lport, assignment); |
| } |
| } while (!cachedCurrentAssignments.compareAndSet(orig, update)); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| while (!done) { |
| Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get()); |
| |
| Set<BlobChanging> changingResourcesToHandle = dynamicState.changingBlobs; |
| if (!changingBlobs.isEmpty()) { |
| changingResourcesToHandle = new HashSet<>(changingResourcesToHandle); |
| changingBlobs.drainTo(changingResourcesToHandle); |
| Iterator<BlobChanging> it = changingResourcesToHandle.iterator(); |
| |
| //Remove/Clean up changed requests that are not for us |
| while (it.hasNext()) { |
| BlobChanging rc = it.next(); |
| if (!forSameTopology(rc.assignment, dynamicState.currentAssignment) |
| && !forSameTopology(rc.assignment, dynamicState.newAssignment)) { |
| rc.latch.countDown(); //Ignore the future |
| it.remove(); |
| } |
| } |
| } |
| |
| DynamicState nextState = |
| stateMachineStep(dynamicState.withNewAssignment(newAssignment.get()) |
| .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions) |
| .withChangingBlobs(changingResourcesToHandle), staticState); |
| |
| if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) { |
| LOG.info("STATE {} -> {}", dynamicState, nextState); |
| } |
| //Save the current state for recovery |
| if ((nextState.currentAssignment != null |
| && !nextState.currentAssignment.equals(dynamicState.currentAssignment)) |
| || (dynamicState.currentAssignment != null |
| && !dynamicState.currentAssignment.equals(nextState.currentAssignment))) { |
| LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, |
| nextState.currentAssignment); |
| saveNewAssignment(nextState.currentAssignment); |
| } |
| |
| if (equivalent(nextState.newAssignment, nextState.currentAssignment) |
| && nextState.currentAssignment != null |
| && nextState.currentAssignment.get_owner() == null |
| && nextState.newAssignment != null |
| && nextState.newAssignment.get_owner() != null) { |
| //This is an odd case for a rolling upgrade where the user on the old assignment may be null, |
| // but not on the new one. Although in all other ways they are the same. |
| // If this happens we want to use the assignment with the owner. |
| LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner()); |
| saveNewAssignment(nextState.newAssignment); |
| nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment); |
| } |
| |
| // clean up the profiler actions that are not being processed |
| Set<TopoProfileAction> removed = new HashSet<>(origProfileActions); |
| removed.removeAll(dynamicState.profileActions); |
| removed.removeAll(dynamicState.pendingStopProfileActions); |
| for (TopoProfileAction action : removed) { |
| try { |
| clusterState.deleteTopologyProfileRequests(action.topoId, action.request); |
| } catch (Exception e) { |
| LOG.error("Error trying to remove profiling request, it will be retried", e); |
| } |
| } |
| Set<TopoProfileAction> orig; |
| Set<TopoProfileAction> copy; |
| do { |
| orig = profiling.get(); |
| copy = new HashSet<>(orig); |
| copy.removeAll(removed); |
| } while (!profiling.compareAndSet(orig, copy)); |
| dynamicState = nextState; |
| } |
| } catch (Throwable e) { |
| if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { |
| LOG.error("Error when processing event", e); |
| Utils.exitProcess(20, "Error when processing an event"); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws Exception { |
| done = true; |
| this.interrupt(); |
| this.join(); |
| } |
| |
| enum MachineState { |
| /** |
| * Slot is empty, and no worker is running. |
| */ |
| EMPTY, |
| /** |
| * Slot is running a worker. |
| */ |
| RUNNING, |
| /** |
| * Slot has launched a worker, and is waiting for it to heartbeat. |
| */ |
| WAITING_FOR_WORKER_START, |
| /** |
| * Slot has just killed its worker, and is now waiting for it to die so it can be relaunched in the same container. |
| */ |
| KILL_AND_RELAUNCH, |
| /** |
| * Slot has just killed its worker, and is now waiting for it to die so the container can be deleted. |
| */ |
| KILL, |
| /** |
| * Slot has just killed its worker, and is now waiting for it to die so the localizer can update a blob. |
| */ |
| KILL_BLOB_UPDATE, |
| /** |
| * The slot is empty, and is waiting for blobs to download before the worker can be launched. |
| */ |
| WAITING_FOR_BLOB_LOCALIZATION, |
| /** |
| * The slot is empty, and is waiting for blobs to be updated before the worker can be (re)launched. |
| */ |
| WAITING_FOR_BLOB_UPDATE; |
| |
| @Override |
| public String toString() { |
| return EnumUtil.toMetricName(this); |
| } |
| } |
| |
| static class StaticState { |
| public final AsyncLocalizer localizer; |
| public final long hbTimeoutMs; |
| public final long firstHbTimeoutMs; |
| public final long killSleepMs; |
| public final long monitorFreqMs; |
| public final ContainerLauncher containerLauncher; |
| public final int port; |
| public final String host; |
| public final ISupervisor supervisor; |
| public final LocalState localState; |
| public final BlobChangingCallback changingCallback; |
| public final OnlyLatestExecutor<Integer> metricsExec; |
| public final WorkerMetricsProcessor metricsProcessor; |
| public final SlotMetrics slotMetrics; |
| |
| StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, |
| long killSleepMs, long monitorFreqMs, |
| ContainerLauncher containerLauncher, String host, int port, |
| ISupervisor supervisor, LocalState localState, |
| BlobChangingCallback changingCallback, |
| OnlyLatestExecutor<Integer> metricsExec, |
| WorkerMetricsProcessor metricsProcessor, |
| SlotMetrics slotMetrics) { |
| this.localizer = localizer; |
| this.hbTimeoutMs = hbTimeoutMs; |
| this.firstHbTimeoutMs = firstHbTimeoutMs; |
| this.containerLauncher = containerLauncher; |
| this.killSleepMs = killSleepMs; |
| this.monitorFreqMs = monitorFreqMs; |
| this.host = host; |
| this.port = port; |
| this.supervisor = supervisor; |
| this.localState = localState; |
| this.changingCallback = changingCallback; |
| this.metricsExec = metricsExec; |
| this.metricsProcessor = metricsProcessor; |
| this.slotMetrics = slotMetrics; |
| } |
| } |
| |
| static class DynamicState { |
| public final MachineState state; |
| |
| /** |
| * Latest assignment assigned to this worker, updated asynchronously. |
| */ |
| public final LocalAssignment newAssignment; |
| |
| /** |
| * Assignment the worker is running on, when the state machine enters this state. |
| */ |
| public final LocalAssignment currentAssignment; |
| |
| /** |
| * Assignment assigned to the worker and waiting for blob download. |
| * This is the same as newAssignment if no newer assignment is arrived before |
| * localization completes. |
| */ |
| public final LocalAssignment pendingLocalization; |
| |
| public final Container container; |
| |
| /** |
| * Signals that blobs have been downloaded for the first time. |
| */ |
| public final Future<Void> pendingDownload; |
| public final Set<TopoProfileAction> profileActions; |
| public final Set<TopoProfileAction> pendingStopProfileActions; |
| |
| /** |
| * Blobs that are changed and need to be synced. The localizer notifies the Slot about changing blobs on every state step. |
| * Blob updates are blocked until the Slot unblocks them, at which point they go in {@link #pendingChangingBlobs}. |
| * Updates are blocked until the Slot worker is dead, since blobs may otherwise be actively used. |
| */ |
| public final Set<BlobChanging> changingBlobs; |
| |
| /** |
| * The assignment {@link #pendingChangingBlobs} belongs to. |
| */ |
| public final LocalAssignment pendingChangingBlobsAssignment; |
| |
| public final Set<Future<Void>> pendingChangingBlobs; |
| |
| /** |
| * The entering time when the machine transitions to current state. |
| */ |
| public final long startTime; |
| private final SlotMetrics slotMetrics; |
| |
| DynamicState(final LocalAssignment currentAssignment, Container container, final LocalAssignment newAssignment, |
| SlotMetrics slotMetrics) { |
| this.currentAssignment = currentAssignment; |
| this.container = container; |
| if ((currentAssignment == null) ^ (container == null)) { |
| throw new IllegalArgumentException("Container and current assignment must both be null, or neither can be null"); |
| } |
| |
| if (currentAssignment == null) { |
| state = MachineState.EMPTY; |
| } else { |
| state = MachineState.RUNNING; |
| } |
| slotMetrics.transitionIntoState.get(state).mark(); |
| |
| this.startTime = Time.currentTimeMillis(); |
| this.newAssignment = newAssignment; |
| this.pendingLocalization = null; |
| this.pendingDownload = null; |
| this.profileActions = Collections.emptySet(); |
| this.pendingStopProfileActions = Collections.emptySet(); |
| this.changingBlobs = Collections.emptySet(); |
| this.pendingChangingBlobsAssignment = null; |
| this.pendingChangingBlobs = Collections.emptySet(); |
| this.slotMetrics = slotMetrics; |
| } |
| |
| DynamicState(final MachineState state, final LocalAssignment newAssignment, |
| final Container container, final LocalAssignment currentAssignment, |
| final LocalAssignment pendingLocalization, final long startTime, |
| final Future<Void> pendingDownload, final Set<TopoProfileAction> profileActions, |
| final Set<TopoProfileAction> pendingStopProfileActions, |
| final Set<BlobChanging> changingBlobs, |
| final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment, |
| final SlotMetrics slotMetrics) { |
| assert pendingChangingBlobs != null; |
| assert pendingChangingBlobs.isEmpty() == (pendingChaningBlobsAssignment == null); |
| this.state = state; |
| this.newAssignment = newAssignment; |
| this.currentAssignment = currentAssignment; |
| this.container = container; |
| this.pendingLocalization = pendingLocalization; |
| this.startTime = startTime; |
| this.pendingDownload = pendingDownload; |
| this.profileActions = profileActions; |
| this.pendingStopProfileActions = pendingStopProfileActions; |
| this.changingBlobs = changingBlobs; |
| this.pendingChangingBlobs = pendingChangingBlobs; |
| this.pendingChangingBlobsAssignment = pendingChaningBlobsAssignment; |
| this.slotMetrics = slotMetrics; |
| } |
| |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| sb.append(state); |
| sb.append(" msInState: "); |
| sb.append(Time.currentTimeMillis() - startTime); |
| if (container != null) { |
| sb.append(" "); |
| sb.append(container); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Set the new assignment for the state. This should never be called from within the state machine. |
| * It is an input from outside. |
| * @param newAssignment the new assignment to set |
| * @return the updated DynamicState. |
| */ |
| public DynamicState withNewAssignment(LocalAssignment newAssignment) { |
| return new DynamicState(this.state, newAssignment, |
| this.container, this.currentAssignment, |
| this.pendingLocalization, this.startTime, |
| this.pendingDownload, this.profileActions, |
| this.pendingStopProfileActions, this.changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| public DynamicState withPendingLocalization(LocalAssignment pendingLocalization, Future<Void> pendingDownload) { |
| return new DynamicState(this.state, this.newAssignment, |
| this.container, this.currentAssignment, |
| pendingLocalization, this.startTime, |
| pendingDownload, this.profileActions, |
| this.pendingStopProfileActions, this.changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| public DynamicState withPendingLocalization(Future<Void> pendingDownload) { |
| return withPendingLocalization(this.pendingLocalization, pendingDownload); |
| } |
| |
| /** |
| * Transition to the given state. Notice that it's possible to transition to |
| * the same state. |
| * @param state The state to transition into |
| * @return New dynamicState |
| */ |
| public DynamicState withState(final MachineState state) { |
| long newStartTime = Time.currentTimeMillis(); |
| //We may (though unlikely) lose metering here if state transition is too frequent (less than a millisecond) |
| slotMetrics.timeSpentInState.get(this.state).update(newStartTime - startTime, TimeUnit.MILLISECONDS); |
| slotMetrics.transitionIntoState.get(state).mark(); |
| |
| LocalAssignment assignment = this.currentAssignment; |
| if (MachineState.RUNNING != this.state && MachineState.RUNNING == state |
| && this.currentAssignment instanceof TimerDecoratedAssignment) { |
| ((TimerDecoratedAssignment) assignment).stopTiming(); |
| //Timer is discarded after the initial launch of an assignment |
| assignment = new LocalAssignment(this.currentAssignment); |
| } |
| |
| return new DynamicState(state, this.newAssignment, |
| this.container, assignment, |
| this.pendingLocalization, newStartTime, |
| this.pendingDownload, this.profileActions, |
| this.pendingStopProfileActions, this.changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| public DynamicState withCurrentAssignment(final Container container, final LocalAssignment currentAssignment) { |
| return new DynamicState(this.state, this.newAssignment, |
| container, currentAssignment, |
| this.pendingLocalization, this.startTime, |
| this.pendingDownload, this.profileActions, |
| this.pendingStopProfileActions, this.changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| public DynamicState withProfileActions(Set<TopoProfileAction> profileActions, Set<TopoProfileAction> pendingStopProfileActions) { |
| return new DynamicState(this.state, this.newAssignment, |
| this.container, this.currentAssignment, |
| this.pendingLocalization, this.startTime, |
| this.pendingDownload, profileActions, |
| pendingStopProfileActions, this.changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| /** |
| * Set the blocked changing blobs. This is an input from the outside, and should never be called by the state machine steps. |
| */ |
| public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) { |
| if (changingBlobs == this.changingBlobs) { |
| return this; |
| } |
| return new DynamicState(this.state, this.newAssignment, |
| this.container, this.currentAssignment, |
| this.pendingLocalization, this.startTime, |
| this.pendingDownload, profileActions, |
| this.pendingStopProfileActions, changingBlobs, |
| this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| |
| public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBlobs, |
| LocalAssignment pendingChangingBlobsAssignment) { |
| return new DynamicState(this.state, this.newAssignment, |
| this.container, this.currentAssignment, |
| this.pendingLocalization, this.startTime, |
| this.pendingDownload, profileActions, |
| this.pendingStopProfileActions, this.changingBlobs, |
| pendingChangingBlobs, |
| pendingChangingBlobsAssignment, this.slotMetrics); |
| } |
| } |
| |
| static class TopoProfileAction { |
| public final String topoId; |
| public final ProfileRequest request; |
| |
| TopoProfileAction(String topoId, ProfileRequest request) { |
| this.topoId = topoId; |
| this.request = request; |
| } |
| |
| @Override |
| public int hashCode() { |
| return (37 * topoId.hashCode()) + request.hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (!(other instanceof TopoProfileAction)) { |
| return false; |
| } |
| TopoProfileAction o = (TopoProfileAction) other; |
| return topoId.equals(o.topoId) && request.equals(o.request); |
| } |
| |
| @Override |
| public String toString() { |
| return "{ " + topoId + ": " + request + " }"; |
| } |
| } |
| |
| /** |
| * Holds the information about a blob that is changing. |
| */ |
| static class BlobChanging { |
| private final LocalAssignment assignment; |
| private final LocallyCachedBlob blob; |
| private final GoodToGo.GoodToGoLatch latch; |
| |
| BlobChanging(LocalAssignment assignment, LocallyCachedBlob blob, GoodToGo.GoodToGoLatch latch) { |
| this.assignment = assignment; |
| this.blob = blob; |
| this.latch = latch; |
| } |
| |
| @Override |
| public String toString() { |
| return "BLOB CHANGING " + blob + " " + assignment; |
| } |
| } |
| } |