blob: de29ace0b6b15773d7faf4b16d14cebbf44b50a2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.daemon.supervisor;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.BlobChangingCallback;
import org.apache.storm.localizer.GoodToGo;
import org.apache.storm.localizer.LocallyCachedBlob;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.EnumUtil;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Slot extends Thread implements AutoCloseable, BlobChangingCallback {
private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
enum KillReason {
public String toString() {
return EnumUtil.toMetricName(this);
private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>();
private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>());
private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>();
private final StaticState staticState;
private final IStormClusterState clusterState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
private final OnlyLatestExecutor<Integer> metricsExec;
private volatile boolean done = false;
private volatile DynamicState dynamicState;
public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
ContainerLauncher containerLauncher, String host,
int port, LocalState localState,
IStormClusterState clusterState,
ISupervisor supervisor,
AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments,
OnlyLatestExecutor<Integer> metricsExec,
WorkerMetricsProcessor metricsProcessor,
SlotMetrics slotMetrics) throws Exception {
super("SLOT_" + port);
this.metricsExec = metricsExec;
this.cachedCurrentAssignments = cachedCurrentAssignments;
this.clusterState = clusterState;
this.staticState = new StaticState(localizer,
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
metricsExec, metricsProcessor, slotMetrics);
LocalAssignment currentAssignment = null;
Container container = null;
LocalAssignment newAssignment = null;
Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
if (assignments != null) {
currentAssignment = assignments.get(port);
if (currentAssignment != null) {
try {
// For now we do not make a transaction when removing a topology assignment from local, an overdue
// assignment may be left on local disk.
// So we should check if the local disk assignment is valid when initializing:
// if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out;
// if topology files exist but the topology id is invalid, just let Supervisor make a sync;
// if topology files exist and topology files is valid, recover the container.
if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) {
container = containerLauncher.recoverContainer(port, currentAssignment, localState);
} else {
// Make the assignment null to let slot clean up the disk assignment.
currentAssignment = null;
} catch (ContainerRecoveryException e) {
//We could not recover container will be null.
currentAssignment = null;
newAssignment = currentAssignment;
//if the current assignment is already running, new assignment will never be promoted to currAssignment,
// because Timer is not being compared in #equals or #equivalent, meaning newAssignment always equals to currAssignment.
// Therefore the timer in newAssignment won't be invoked
this.dynamicState = new DynamicState(currentAssignment, container, this.newAssignment.get(), slotMetrics);
if (MachineState.RUNNING == dynamicState.state) {
//We are running so we should recover the blobs.
staticState.localizer.recoverRunningTopology(currentAssignment, port, this);
}"SLOT {}:{} Starting in state {} - assignment {}",, staticState.port, dynamicState.state,
//In some cases the new LocalAssignment may be equivalent to the old, but
// It is not equal. In those cases we want to update the current assignment to
// be the same as the new assignment
//PRECONDITION: The new and current assignments must be equivalent
private static DynamicState updateAssignmentIfNeeded(DynamicState dynamicState) {
assert equivalent(dynamicState.newAssignment, dynamicState.currentAssignment);
if (dynamicState.newAssignment != null
&& !dynamicState.newAssignment.equals(dynamicState.currentAssignment)) {
dynamicState =
dynamicState.withCurrentAssignment(dynamicState.container, dynamicState.newAssignment);
return dynamicState;
static boolean forSameTopology(LocalAssignment a, LocalAssignment b) {
if (a == null && b == null) {
return true;
if (a != null && b != null) {
if (a.get_topology_id().equals(b.get_topology_id())) {
return true;
return false;
* This method compares WorkerResources while considering any resources are NULL to be 0.0
* @param first WorkerResources A
* @param second WorkerResources B
* @return True if A and B are equivalent, treating the absent resources as 0.0
static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) {
if (first == null) {
return false;
if (first == second) {
return true;
if (first.equals(second)) {
return true;
if (first.get_cpu() != second.get_cpu()) {
return false;
if (first.get_mem_on_heap() != second.get_mem_on_heap()) {
return false;
if (first.get_mem_off_heap() != second.get_mem_off_heap()) {
return false;
if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) {
return false;
if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) {
return false;
if (!customResourceMapEquality(first.get_resources(), second.get_resources())) {
return false;
if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) {
return false;
return true;
* This method compares Resource Maps while considering any resources are NULL to be 0.0
* @param firstMap Resource Map A
* @param secondMap Resource Map B
* @return True if A and B are equivalent, treating the absent resources as 0.0
private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) {
if (firstMap == null && secondMap == null) {
return true;
if (firstMap == null) {
firstMap = new HashMap<>();
if (secondMap == null) {
secondMap = new HashMap<>();
Set<String> keys = new HashSet<>(firstMap.keySet());
for (String key : keys) {
if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) {
return false;
return true;
* Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method.
* @param first Local assignment A
* @param second Local assignment B
* @return True if A and B are equivalent, ignoring the order of the executors
static boolean equivalent(LocalAssignment first, LocalAssignment second) {
if (first == null && second == null) {
return true;
if (first != null && second != null) {
if (first.get_topology_id().equals(second.get_topology_id())) {
Set<ExecutorInfo> aexec = new HashSet<>(first.get_executors());
Set<ExecutorInfo> bexec = new HashSet<>(second.get_executors());
if (aexec.equals(bexec)) {
boolean firstHasResources = first.is_set_resources();
boolean secondHasResources = second.is_set_resources();
if (!firstHasResources && !secondHasResources) {
return true;
if (firstHasResources && secondHasResources) {
WorkerResources firstResources = first.get_resources();
WorkerResources secondResources = second.get_resources();
return customWorkerResourcesEquality(firstResources, secondResources);
return false;
static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception {
LOG.debug("STATE {}", dynamicState.state);
switch (dynamicState.state) {
case EMPTY:
return handleEmpty(dynamicState, staticState);
return handleRunning(dynamicState, staticState);
return handleWaitingForWorkerStart(dynamicState, staticState);
return handleKillBlobUpdate(dynamicState, staticState);
return handleKillAndRelaunch(dynamicState, staticState);
case KILL:
return handleKill(dynamicState, staticState);
return handleWaitingForBlobLocalization(dynamicState, staticState);
return handleWaitingForBlobUpdate(dynamicState, staticState);
throw new IllegalStateException("Code not ready to handle a state of " + dynamicState.state);
* Prepare for a new assignment by downloading new required blobs, or going to empty if there is nothing to download.
* PRECONDITION: The slot should be empty
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws IOException on any error
private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState,
StaticState staticState) throws IOException {
assert (dynamicState.container == null);
assert dynamicState.currentAssignment == null;
//We're either going to empty, or starting fresh blob download. Either way, the changing blob notifications are outdated.
dynamicState = drainAllChangingBlobs(dynamicState);
if (dynamicState.newAssignment == null) {
return dynamicState.withState(MachineState.EMPTY);
Future<Void> pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment,
staticState.port, staticState.changingCallback);
return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload)
private static DynamicState killContainerFor(KillReason reason, DynamicState dynamicState, StaticState staticState)
throws Exception {
assert dynamicState.container != null;
//Skip special case if `storm kill_workers` is already invoked
Boolean isDead = dynamicState.container.areAllProcessesDead();
if (!isDead) {
if (reason == KillReason.ASSIGNMENT_CHANGED || reason == KillReason.BLOB_CHANGED) {
DynamicState next;
switch (reason) {
Future<Void> pendingDownload = null;
if (dynamicState.newAssignment != null) {
pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(
dynamicState.newAssignment, staticState.port, staticState.changingCallback);
next = dynamicState.withState(MachineState.KILL)
.withPendingLocalization(dynamicState.newAssignment, pendingDownload);
next = dynamicState.withState(MachineState.KILL_BLOB_UPDATE);
case HB_NULL:
//any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
next = dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet());
throw new IllegalArgumentException("Unknown reason for killing a container");
if (!isDead) {
return next;
* Clean up a container.
* PRECONDITION: All of the processes have died.
* @param dynamicState current state
* @param staticState static data
* @param nextState the next MachineState to go to.
* @return the next state.
private static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws
Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert (dynamicState.container.areAllProcessesDead());
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
DynamicState ret = dynamicState.withCurrentAssignment(null, null);
if (nextState != null) {
ret = ret.withState(nextState);
return ret;
* Drop all of the changingBlobs and pendingChangingBlobs.
* <p>PRECONDITION: container is null
* @param dynamicState current state.
* @return the next state.
private static DynamicState drainAllChangingBlobs(DynamicState dynamicState) {
assert dynamicState.container == null;
if (!dynamicState.changingBlobs.isEmpty()) {
for (BlobChanging rc : dynamicState.changingBlobs) {
dynamicState = dynamicState.withChangingBlobs(Collections.emptySet());
if (!dynamicState.pendingChangingBlobs.isEmpty()) {
dynamicState = dynamicState.withPendingChangingBlobs(Collections.emptySet(), null);
return dynamicState;
* Informs the async localizer for all of blobs that the worker acknowledged the change of blobs.
* Worker has stop as of now.
* <p>PRECONDITION: container is null
* PRECONDITION: changingBlobs should only be for the given assignment.
* @param dynamicState the current state
* @return the futures for the current assignment.
private static DynamicState informChangedBlobs(DynamicState dynamicState, LocalAssignment assignment) {
assert dynamicState.container == null;
assert -> forSameTopology(cr.assignment, assignment));
Set<Future<Void>> futures = new HashSet<>(dynamicState.changingBlobs.size());
// We need to add the new futures to the existing ones
if (forSameTopology(dynamicState.pendingChangingBlobsAssignment, assignment)) {
// Acknowledge all changing blobs as futures
for (BlobChanging rc : dynamicState.changingBlobs) {
LOG.debug("found changing blobs {} moving them to pending...", dynamicState.changingBlobs);
return dynamicState.withChangingBlobs(Collections.emptySet())
.withPendingChangingBlobs(futures, assignment);
* Filter all of the changing blobs to just be for those compatible with the given assignment.
* All others will be released appropriately.
* @param dynamicState the current state
* @param assignment the assignment to look for
* @return the updated dynamicState
private static DynamicState filterChangingBlobsFor(DynamicState dynamicState, final LocalAssignment assignment) {
if (dynamicState.changingBlobs.isEmpty()) {
return dynamicState;
HashSet<BlobChanging> savedBlobs = new HashSet<>(dynamicState.changingBlobs.size());
for (BlobChanging rc : dynamicState.changingBlobs) {
if (forSameTopology(assignment, rc.assignment)) {
} else {
return dynamicState.withChangingBlobs(savedBlobs);
* State Transitions for WAITING_FOR_BLOB_LOCALIZATION state, when the slot is waiting for
* blobs of the pending assignment to be completely downloaded, before the container is launched/relaunched.
* PRECONDITION: neither pendingLocalization nor pendingDownload is null.
* PRECONDITION: The slot should be empty
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.pendingLocalization != null);
assert (dynamicState.pendingDownload != null);
assert (dynamicState.container == null);
assert dynamicState.currentAssignment == null;
//Ignore changes to scheduling while downloading the topology blobs
// We don't support canceling the download through the future yet,
// because pending blobs may be shared by multiple workers and cancel it
// may lead to race condition
// To keep everything in sync, just wait for all workers
try {
//Release things that don't need to wait for us to finish downloading.
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingLocalization);
if (!dynamicState.changingBlobs.isEmpty()) {
//Unblock downloading by accepting the futures.
dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingLocalization);
if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
//Scheduling changed
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
// Switch to the new assignment even if localization hasn't completed, or go to empty state
// if no new assignment.
return prepareForNewAssignmentNoWorkersRunning(dynamicState
.withPendingLocalization(null, null),
// Wait until time out
dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
//Downloading of all blobs finished. This is the precondition for all codes below.
if (!dynamicState.pendingChangingBlobs.isEmpty()) {"There are pending changes, waiting for them to finish before launching container...");
//We cannot launch the container yet the resources may still be updating
return dynamicState.withState(MachineState.WAITING_FOR_BLOB_UPDATE)
.withPendingLocalization(null, null);
Container c =
staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
return dynamicState
.withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START)
.withPendingLocalization(null, null);
} catch (TimeoutException e) {
//We waited for 1 second loop around and try again....
return dynamicState;
} catch (ExecutionException e) {
if (e.getCause() instanceof AuthorizationException) {
LOG.error("{}", ((AuthorizationException) e.getCause()).get_msg());
} else if (e.getCause() instanceof KeyNotFoundException) {
LOG.error("{}", ((KeyNotFoundException) e.getCause()).get_msg());
} else {
LOG.error("{}", e.getCause().getMessage());
// release the reference on all blobs associated with this topology.
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
// we wait for 3 seconds
//Try again, or go to empty if assignment has been nulled
return prepareForNewAssignmentNoWorkersRunning(dynamicState
.withPendingLocalization(null, null),
* State Transitions for WAITING_FOR_BLOB_UPDATE state.
* <p>PRECONDITION: container is null
* PRECONDITION: pendingChangingBlobs is not empty (otherwise why did we go to this state)
* PRECONDITION: pendingChangingBlobsAssignment is not null.
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleWaitingForBlobUpdate(DynamicState dynamicState, StaticState staticState)
throws Exception {
assert dynamicState.container == null;
assert dynamicState.pendingChangingBlobsAssignment != null;
assert !dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingDownload == null;
assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
//We were rescheduled while waiting for the resources to be updated,
// but the container is already not running."SLOT {}: Assignment Changed from {} to {}", staticState.port,
dynamicState.currentAssignment, dynamicState.newAssignment);
if (dynamicState.currentAssignment != null) {
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment, staticState.port);
return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null),
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingChangingBlobsAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingChangingBlobsAssignment);
//We only have a set amount of time we can wait for before looping around again
long start = Time.nanoTime();
try {
for (Future<Void> pending : dynamicState.pendingChangingBlobs) {
long now = Time.nanoTime();
long timeLeft = ONE_SEC_IN_NANO - (now - start);
if (timeLeft <= 0) {
throw new TimeoutException();
pending.get(timeLeft, TimeUnit.NANOSECONDS);
//All done we can launch the worker now
Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingChangingBlobsAssignment,
return dynamicState
.withCurrentAssignment(c, dynamicState.pendingChangingBlobsAssignment).withState(MachineState.WAITING_FOR_WORKER_START)
.withPendingChangingBlobs(Collections.emptySet(), null);
} catch (TimeoutException ex) {
return dynamicState;
* State Transitions for KILL state.
* PRECONDITION: container.kill() was called
* PRECONDITION: container != null && currentAssignment != null
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
if (dynamicState.container.areAllProcessesDead()) {"SLOT {} all processes are dead...", staticState.port);
return cleanupCurrentContainer(dynamicState,
dynamicState.pendingLocalization == null
? MachineState.EMPTY
LOG.warn("SLOT {} force kill and wait...", staticState.port);
return dynamicState;
* State Transitions for KILL_AND_RELAUNCH state.
* PRECONDITION: container.kill() was called
* PRECONDITION: container != null && currentAssignment != null
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
assert dynamicState.pendingLocalization == null;
assert dynamicState.pendingDownload == null;
if (dynamicState.container.areAllProcessesDead()) {
if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
return dynamicState.withState(MachineState.WAITING_FOR_WORKER_START);
//Scheduling changed after we killed all of the processes
return prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, staticState, null), staticState);
//The child processes typically exit in < 1 sec. If 2 mins later they are still around something is wrong
if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) {
throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds");
return dynamicState;
* State Transitions for KILL_BLOB_UPDATE state.
* PRECONDITION: container.kill() was called
* PRECONDITION: container != null && currentAssignment != null
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleKillBlobUpdate(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
assert dynamicState.pendingDownload == null;
assert dynamicState.pendingLocalization == null;
//Release things that don't need to wait for us
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (dynamicState.container.areAllProcessesDead()) {
if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
dynamicState = dynamicState.withCurrentAssignment(null, dynamicState.currentAssignment);
return informChangedBlobs(dynamicState, dynamicState.currentAssignment)
//Scheduling changed after we killed all of the processes
return prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, staticState, null), staticState);
//The child processes typically exit in < 1 sec. If 2 mins later they are still around something is wrong
if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) {
throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds");
return dynamicState;
* State Transitions for WAITING_FOR_WORKER_START state.
* PRECONDITION: container != null && currentAssignment != null
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
assert dynamicState.pendingDownload == null;
assert dynamicState.pendingLocalization == null;
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb != null) {
long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
if (hbAgeMs <= hbTimeoutMs) {
return dynamicState.withState(MachineState.RUNNING);
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
//We were rescheduled while waiting for the worker to come up"SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
dynamicState = updateAssignmentIfNeeded(dynamicState);
long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState);
if (timeDiffms > hbFirstTimeoutMs) {
LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
//Kill the container and restart it
return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
return dynamicState;
* State Transitions for RUNNING state.
* PRECONDITION: container != null && currentAssignment != null
* @param dynamicState current state
* @param staticState static data
* @return the next state
* @throws Exception on any error
private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
assert dynamicState.pendingDownload == null;
assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {"SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
//Scheduling changed while running...
return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
dynamicState = updateAssignmentIfNeeded(dynamicState);
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
//Kill the container and restart it
return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
if (dynamicState.container.didMainProcessExit()) {
LOG.warn("SLOT {}: main process has exited for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
LOG.warn("SLOT {}: violated memory limits for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb == null) {
LOG.warn("SLOT {}: HB returned as null for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
//This can happen if the supervisor crashed after launching a
// worker that never came up.
return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
if (timeDiffMs > hbTimeoutMs) {
LOG.warn("SLOT {}: HB is too old {} > {} for topology: {}",
staticState.port, timeDiffMs, hbTimeoutMs, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
//The worker is up and running check for profiling requests
if (!dynamicState.profileActions.isEmpty()) {
HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
HashSet<TopoProfileAction> modPending = new HashSet<>(dynamicState.pendingStopProfileActions);
Iterator<TopoProfileAction> iter = mod.iterator();
while (iter.hasNext()) {
TopoProfileAction action =;
if (!action.topoId.equals(dynamicState.currentAssignment.get_topology_id())) {
LOG.warn("Dropping {} wrong topology is running", action);
//Not for this topology so skip it
} else {
if (modPending.contains(action)) {
boolean isTimeForStop = Time.currentTimeMillis() > action.request.get_time_stamp();
if (isTimeForStop) {
if (dynamicState.container.runProfiling(action.request, true)) {
LOG.debug("Stopped {} action finished", action);
} else {
LOG.warn("Stopping {} failed, will be retried", action);
} else {
LOG.debug("Still pending {} now: {}", action, Time.currentTimeMillis());
} else {
//J_PROFILE_START is not used. When you see a J_PROFILE_STOP
// start profiling and save it away to stop when timeout happens
if (action.request.get_action() == ProfileAction.JPROFILE_STOP) {
if (dynamicState.container.runProfiling(action.request, false)) {
LOG.debug("Started {} now: {}", action, Time.currentTimeMillis());
} else {
LOG.warn("Starting {} failed, will be retried", action);
} else {
if (dynamicState.container.runProfiling(action.request, false)) {
LOG.debug("Started {} action finished", action);
} else {
LOG.warn("Starting {} failed, will be retried", action);
dynamicState = dynamicState.withProfileActions(mod, modPending);
dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor);
return dynamicState;
static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException {
assert dynamicState.currentAssignment == null;
assert dynamicState.container == null;
assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
assert dynamicState.pendingDownload == null;
assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
dynamicState = updateAssignmentIfNeeded(dynamicState);
//Both assignments are null, just wait
if (dynamicState.profileActions != null && !dynamicState.profileActions.isEmpty()) {
//Nothing is scheduled here so throw away all of the profileActions
LOG.warn("Dropping {} no topology is running", dynamicState.profileActions);
dynamicState = dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet());
//Drop the change notifications we are not running anything right now
dynamicState = drainAllChangingBlobs(dynamicState);
return dynamicState;
MachineState getMachineState() {
return dynamicState.state;
* Get worker heartbeat timeout time in ms. Use topology specified timeout if provided.
private static long getHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
long hbTimeoutMs = staticState.hbTimeoutMs;
Map<String, Object> topoConf = dynamicState.container.topoConf;
if (topoConf != null && topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
long topoHbTimeoutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) * 1000;
topoHbTimeoutMs = Math.max(topoHbTimeoutMs, hbTimeoutMs);
hbTimeoutMs = topoHbTimeoutMs;
return hbTimeoutMs;
* Get worker heartbeat timeout when waiting for worker to start.
* If topology specific timeout if set, ensure first heartbeat timeout >= topology specific timeout.
private static long getFirstHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
return Math.max(getHbTimeoutMs(staticState, dynamicState), staticState.firstHbTimeoutMs);
* Set a new assignment asynchronously.
* @param newAssignment the new assignment for this slot to run, null to run nothing
public final void setNewAssignment(LocalAssignment newAssignment) {
this.newAssignment.set(newAssignment == null
? null
: new TimerDecoratedAssignment(newAssignment, staticState.slotMetrics.workerLaunchDuration));
public void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob blob, GoodToGo go) {
assert port == staticState.port : "got a callback that is not for us " + port + " != " + staticState.port;
//This is called async so lets assume that it is something we care about
try {
changingBlobs.put(new BlobChanging(assignment, blob, go.getLatch()));
} catch (InterruptedException e) {
throw new RuntimeException("This should not have happened, but it did (the queue is unbounded)", e);
public void addProfilerActions(Set<TopoProfileAction> actions) {
if (actions != null) {
while (true) {
Set<TopoProfileAction> orig = profiling.get();
Set<TopoProfileAction> newActions = new HashSet<>(orig);
if (profiling.compareAndSet(orig, newActions)) {
* get the workerID (nullable) from CURRENT container, if existed, or return null.
* @return workerID
public String getWorkerId() {
String workerId = null;
Container c = dynamicState.container;
if (c != null) {
workerId = c.getWorkerId();
return workerId;
private void saveNewAssignment(LocalAssignment assignment) {
synchronized (staticState.localState) {
Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap();
if (assignments == null) {
assignments = new HashMap<>();
if (assignment == null) {
} else {
assignments.put(staticState.port, assignment);
Map<Long, LocalAssignment> update = null;
Map<Long, LocalAssignment> orig = null;
do {
Long lport = new Long(staticState.port);
orig = cachedCurrentAssignments.get();
update = new HashMap<>(orig);
if (assignment == null) {
} else {
update.put(lport, assignment);
} while (!cachedCurrentAssignments.compareAndSet(orig, update));
public void run() {
try {
while (!done) {
Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());
Set<BlobChanging> changingResourcesToHandle = dynamicState.changingBlobs;
if (!changingBlobs.isEmpty()) {
changingResourcesToHandle = new HashSet<>(changingResourcesToHandle);
Iterator<BlobChanging> it = changingResourcesToHandle.iterator();
//Remove/Clean up changed requests that are not for us
while (it.hasNext()) {
BlobChanging rc =;
if (!forSameTopology(rc.assignment, dynamicState.currentAssignment)
&& !forSameTopology(rc.assignment, dynamicState.newAssignment)) {
rc.latch.countDown(); //Ignore the future
DynamicState nextState =
.withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions)
.withChangingBlobs(changingResourcesToHandle), staticState);
if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {"STATE {} -> {}", dynamicState, nextState);
//Save the current state for recovery
if ((nextState.currentAssignment != null
&& !nextState.currentAssignment.equals(dynamicState.currentAssignment))
|| (dynamicState.currentAssignment != null
&& !dynamicState.currentAssignment.equals(nextState.currentAssignment))) {"SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment,
if (equivalent(nextState.newAssignment, nextState.currentAssignment)
&& nextState.currentAssignment != null
&& nextState.currentAssignment.get_owner() == null
&& nextState.newAssignment != null
&& nextState.newAssignment.get_owner() != null) {
//This is an odd case for a rolling upgrade where the user on the old assignment may be null,
// but not on the new one. Although in all other ways they are the same.
// If this happens we want to use the assignment with the owner."Updating assignment to save owner {}", nextState.newAssignment.get_owner());
nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
// clean up the profiler actions that are not being processed
Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
for (TopoProfileAction action : removed) {
try {
clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
} catch (Exception e) {
LOG.error("Error trying to remove profiling request, it will be retried", e);
Set<TopoProfileAction> orig;
Set<TopoProfileAction> copy;
do {
orig = profiling.get();
copy = new HashSet<>(orig);
} while (!profiling.compareAndSet(orig, copy));
dynamicState = nextState;
} catch (Throwable e) {
if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
LOG.error("Error when processing event", e);
Utils.exitProcess(20, "Error when processing an event");
public void close() throws Exception {
done = true;
enum MachineState {
* Slot is empty, and no worker is running.
* Slot is running a worker.
* Slot has launched a worker, and is waiting for it to heartbeat.
* Slot has just killed its worker, and is now waiting for it to die so it can be relaunched in the same container.
* Slot has just killed its worker, and is now waiting for it to die so the container can be deleted.
* Slot has just killed its worker, and is now waiting for it to die so the localizer can update a blob.
* The slot is empty, and is waiting for blobs to download before the worker can be launched.
* The slot is empty, and is waiting for blobs to be updated before the worker can be (re)launched.
public String toString() {
return EnumUtil.toMetricName(this);
static class StaticState {
public final AsyncLocalizer localizer;
public final long hbTimeoutMs;
public final long firstHbTimeoutMs;
public final long killSleepMs;
public final long monitorFreqMs;
public final ContainerLauncher containerLauncher;
public final int port;
public final String host;
public final ISupervisor supervisor;
public final LocalState localState;
public final BlobChangingCallback changingCallback;
public final OnlyLatestExecutor<Integer> metricsExec;
public final WorkerMetricsProcessor metricsProcessor;
public final SlotMetrics slotMetrics;
StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
long killSleepMs, long monitorFreqMs,
ContainerLauncher containerLauncher, String host, int port,
ISupervisor supervisor, LocalState localState,
BlobChangingCallback changingCallback,
OnlyLatestExecutor<Integer> metricsExec,
WorkerMetricsProcessor metricsProcessor,
SlotMetrics slotMetrics) {
this.localizer = localizer;
this.hbTimeoutMs = hbTimeoutMs;
this.firstHbTimeoutMs = firstHbTimeoutMs;
this.containerLauncher = containerLauncher;
this.killSleepMs = killSleepMs;
this.monitorFreqMs = monitorFreqMs; = host;
this.port = port;
this.supervisor = supervisor;
this.localState = localState;
this.changingCallback = changingCallback;
this.metricsExec = metricsExec;
this.metricsProcessor = metricsProcessor;
this.slotMetrics = slotMetrics;
static class DynamicState {
public final MachineState state;
* Latest assignment assigned to this worker, updated asynchronously.
public final LocalAssignment newAssignment;
* Assignment the worker is running on, when the state machine enters this state.
public final LocalAssignment currentAssignment;
* Assignment assigned to the worker and waiting for blob download.
* This is the same as newAssignment if no newer assignment is arrived before
* localization completes.
public final LocalAssignment pendingLocalization;
public final Container container;
* Signals that blobs have been downloaded for the first time.
public final Future<Void> pendingDownload;
public final Set<TopoProfileAction> profileActions;
public final Set<TopoProfileAction> pendingStopProfileActions;
* Blobs that are changed and need to be synced. The localizer notifies the Slot about changing blobs on every state step.
* Blob updates are blocked until the Slot unblocks them, at which point they go in {@link #pendingChangingBlobs}.
* Updates are blocked until the Slot worker is dead, since blobs may otherwise be actively used.
public final Set<BlobChanging> changingBlobs;
* The assignment {@link #pendingChangingBlobs} belongs to.
public final LocalAssignment pendingChangingBlobsAssignment;
public final Set<Future<Void>> pendingChangingBlobs;
* The entering time when the machine transitions to current state.
public final long startTime;
private final SlotMetrics slotMetrics;
DynamicState(final LocalAssignment currentAssignment, Container container, final LocalAssignment newAssignment,
SlotMetrics slotMetrics) {
this.currentAssignment = currentAssignment;
this.container = container;
if ((currentAssignment == null) ^ (container == null)) {
throw new IllegalArgumentException("Container and current assignment must both be null, or neither can be null");
if (currentAssignment == null) {
state = MachineState.EMPTY;
} else {
state = MachineState.RUNNING;
this.startTime = Time.currentTimeMillis();
this.newAssignment = newAssignment;
this.pendingLocalization = null;
this.pendingDownload = null;
this.profileActions = Collections.emptySet();
this.pendingStopProfileActions = Collections.emptySet();
this.changingBlobs = Collections.emptySet();
this.pendingChangingBlobsAssignment = null;
this.pendingChangingBlobs = Collections.emptySet();
this.slotMetrics = slotMetrics;
DynamicState(final MachineState state, final LocalAssignment newAssignment,
final Container container, final LocalAssignment currentAssignment,
final LocalAssignment pendingLocalization, final long startTime,
final Future<Void> pendingDownload, final Set<TopoProfileAction> profileActions,
final Set<TopoProfileAction> pendingStopProfileActions,
final Set<BlobChanging> changingBlobs,
final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment,
final SlotMetrics slotMetrics) {
assert pendingChangingBlobs != null;
assert pendingChangingBlobs.isEmpty() == (pendingChaningBlobsAssignment == null);
this.state = state;
this.newAssignment = newAssignment;
this.currentAssignment = currentAssignment;
this.container = container;
this.pendingLocalization = pendingLocalization;
this.startTime = startTime;
this.pendingDownload = pendingDownload;
this.profileActions = profileActions;
this.pendingStopProfileActions = pendingStopProfileActions;
this.changingBlobs = changingBlobs;
this.pendingChangingBlobs = pendingChangingBlobs;
this.pendingChangingBlobsAssignment = pendingChaningBlobsAssignment;
this.slotMetrics = slotMetrics;
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(" msInState: ");
sb.append(Time.currentTimeMillis() - startTime);
if (container != null) {
sb.append(" ");
return sb.toString();
* Set the new assignment for the state. This should never be called from within the state machine.
* It is an input from outside.
* @param newAssignment the new assignment to set
* @return the updated DynamicState.
public DynamicState withNewAssignment(LocalAssignment newAssignment) {
return new DynamicState(this.state, newAssignment,
this.container, this.currentAssignment,
this.pendingLocalization, this.startTime,
this.pendingDownload, this.profileActions,
this.pendingStopProfileActions, this.changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
public DynamicState withPendingLocalization(LocalAssignment pendingLocalization, Future<Void> pendingDownload) {
return new DynamicState(this.state, this.newAssignment,
this.container, this.currentAssignment,
pendingLocalization, this.startTime,
pendingDownload, this.profileActions,
this.pendingStopProfileActions, this.changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
public DynamicState withPendingLocalization(Future<Void> pendingDownload) {
return withPendingLocalization(this.pendingLocalization, pendingDownload);
* Transition to the given state. Notice that it's possible to transition to
* the same state.
* @param state The state to transition into
* @return New dynamicState
public DynamicState withState(final MachineState state) {
long newStartTime = Time.currentTimeMillis();
//We may (though unlikely) lose metering here if state transition is too frequent (less than a millisecond)
slotMetrics.timeSpentInState.get(this.state).update(newStartTime - startTime, TimeUnit.MILLISECONDS);
LocalAssignment assignment = this.currentAssignment;
if (MachineState.RUNNING != this.state && MachineState.RUNNING == state
&& this.currentAssignment instanceof TimerDecoratedAssignment) {
((TimerDecoratedAssignment) assignment).stopTiming();
//Timer is discarded after the initial launch of an assignment
assignment = new LocalAssignment(this.currentAssignment);
return new DynamicState(state, this.newAssignment,
this.container, assignment,
this.pendingLocalization, newStartTime,
this.pendingDownload, this.profileActions,
this.pendingStopProfileActions, this.changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
public DynamicState withCurrentAssignment(final Container container, final LocalAssignment currentAssignment) {
return new DynamicState(this.state, this.newAssignment,
container, currentAssignment,
this.pendingLocalization, this.startTime,
this.pendingDownload, this.profileActions,
this.pendingStopProfileActions, this.changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
public DynamicState withProfileActions(Set<TopoProfileAction> profileActions, Set<TopoProfileAction> pendingStopProfileActions) {
return new DynamicState(this.state, this.newAssignment,
this.container, this.currentAssignment,
this.pendingLocalization, this.startTime,
this.pendingDownload, profileActions,
pendingStopProfileActions, this.changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
* Set the blocked changing blobs. This is an input from the outside, and should never be called by the state machine steps.
public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) {
if (changingBlobs == this.changingBlobs) {
return this;
return new DynamicState(this.state, this.newAssignment,
this.container, this.currentAssignment,
this.pendingLocalization, this.startTime,
this.pendingDownload, profileActions,
this.pendingStopProfileActions, changingBlobs,
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBlobs,
LocalAssignment pendingChangingBlobsAssignment) {
return new DynamicState(this.state, this.newAssignment,
this.container, this.currentAssignment,
this.pendingLocalization, this.startTime,
this.pendingDownload, profileActions,
this.pendingStopProfileActions, this.changingBlobs,
pendingChangingBlobsAssignment, this.slotMetrics);
static class TopoProfileAction {
public final String topoId;
public final ProfileRequest request;
TopoProfileAction(String topoId, ProfileRequest request) {
this.topoId = topoId;
this.request = request;
public int hashCode() {
return (37 * topoId.hashCode()) + request.hashCode();
public boolean equals(Object other) {
if (!(other instanceof TopoProfileAction)) {
return false;
TopoProfileAction o = (TopoProfileAction) other;
return topoId.equals(o.topoId) && request.equals(o.request);
public String toString() {
return "{ " + topoId + ": " + request + " }";
* Holds the information about a blob that is changing.
static class BlobChanging {
private final LocalAssignment assignment;
private final LocallyCachedBlob blob;
private final GoodToGo.GoodToGoLatch latch;
BlobChanging(LocalAssignment assignment, LocallyCachedBlob blob, GoodToGo.GoodToGoLatch latch) {
this.assignment = assignment;
this.blob = blob;
this.latch = latch;
public String toString() {
return "BLOB CHANGING " + blob + " " + assignment;