package org.apache.storm.daemon.supervisor;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerMetricList;
import org.apache.storm.generated.WorkerMetricPoint;
import org.apache.storm.generated.WorkerMetrics;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metricstore.MetricException;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
* Represents a container that a worker will run in.
public abstract class Container implements Killable {
private static final Logger LOG = LoggerFactory.getLogger(Container.class);
private static final String MEMORY_USED_METRIC = "UsedMemory";
private static final String SYSTEM_COMPONENT_ID = "System";
private static final String INVALID_EXECUTOR_ID = "-1";
private static final String INVALID_STREAM_ID = "None";
private static final Map<String, Integer> cachedUserToUidMap = new ConcurrentHashMap<>();
private final Meter numCleanupExceptions;
private final Meter numKillExceptions;
private final Meter numForceKillExceptions;
private final Meter numForceKill;
private final Timer shutdownDuration;
private final Timer cleanupDuration;
protected final Map<String, Object> conf;
protected final Map<String, Object> topoConf; //Not set if RECOVER_PARTIAL
protected final String topologyId; //Not set if RECOVER_PARTIAL
protected final String supervisorId;
protected final int supervisorPort;
protected final int port; //Not set if RECOVER_PARTIAL
protected final LocalAssignment assignment; //Not set if RECOVER_PARTIAL
protected final AdvancedFSOps ops;
protected final ResourceIsolationInterface resourceIsolationManager;
protected final boolean symlinksDisabled;
protected String workerId;
protected ContainerType type;
protected ContainerMemoryTracker containerMemoryTracker;
private long lastMetricProcessTime = 0L;
private Timer.Context shutdownTimer = null;
protected boolean runAsUser;
private String cachedUser;
* Create a new Container.
* @param type the type of container being made.
* @param conf the supervisor config
* @param supervisorId the ID of the supervisor this is a part of.
* @param supervisorPort the thrift server port of the supervisor this is a part of.
* @param port the port the container is on. Should be <= 0 if only a partial recovery @param assignment
* the assignment for this container. Should be null if only a partial recovery.
* @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used.
* @param workerId the id of the worker to use. Must not be null if doing a partial recovery.
* @param topoConf the config of the topology (mostly for testing) if null and not a partial recovery the real conf is read.
* @param ops file system operations (mostly for testing) if null a new one is made
* @param metricsRegistry The metrics registry.
* @param containerMemoryTracker The shared memory tracker for the supervisor's containers
* @throws IOException on any error.
protected Container(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
String workerId, Map<String, Object> topoConf, AdvancedFSOps ops,
StormMetricsRegistry metricsRegistry, ContainerMemoryTracker containerMemoryTracker) throws IOException {
assert (type != null);
assert (conf != null);
assert (supervisorId != null);
symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
if (ops == null) {
ops = AdvancedFSOps.make(conf);
this.workerId = workerId;
this.type = type;
this.port = port;
this.ops = ops;
this.conf = conf;
this.supervisorId = supervisorId;
this.supervisorPort = supervisorPort;
this.resourceIsolationManager = resourceIsolationManager;
this.assignment = assignment;
runAsUser = ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
if (runAsUser && Utils.isOnWindows()) {
throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
if (this.type.isOnlyKillable()) {
assert (this.assignment == null);
assert (this.port <= 0);
assert (this.workerId != null);
topologyId = null;
this.topoConf = null;
} else {
assert (assignment != null);
assert (port > 0);
topologyId = assignment.get_topology_id();
if (!this.ops.doRequiredTopoFilesExist(this.conf, topologyId)) {
"Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}",
this.supervisorId, this.port, this.workerId);
throw new ContainerRecoveryException("Missing required topology files...");
if (topoConf == null) {
this.topoConf = readTopoConf();
} else {
//For testing...
this.topoConf = topoConf;
this.numCleanupExceptions = metricsRegistry.registerMeter("supervisor:num-cleanup-exceptions");
this.numKillExceptions = metricsRegistry.registerMeter("supervisor:num-kill-exceptions");
this.numForceKillExceptions = metricsRegistry.registerMeter("supervisor:num-force-kill-exceptions");
this.numForceKill = metricsRegistry.registerMeter("supervisor:num-workers-force-kill");
this.shutdownDuration = metricsRegistry.registerTimer("supervisor:worker-shutdown-duration-ns");
this.cleanupDuration = metricsRegistry.registerTimer("supervisor:worker-per-call-clean-up-duration-ns");
this.containerMemoryTracker = containerMemoryTracker;
public String toString() {
return "topo:" + topologyId + " worker:" + workerId;
protected Map<String, Object> readTopoConf() throws IOException {
assert (topologyId != null);
return ConfigUtils.readSupervisorStormConf(conf, topologyId);
public void kill() throws IOException {"Killing {}:{}", supervisorId, workerId);
if (shutdownTimer == null) {
shutdownTimer = shutdownDuration.time();
try {
if (resourceIsolationManager != null) {
resourceIsolationManager.kill(getWorkerUser(), workerId);
} catch (IOException e) {
throw e;
public void forceKill() throws IOException {"Force Killing {}:{}", supervisorId, workerId);
try {
if (resourceIsolationManager != null) {
resourceIsolationManager.forceKill(getWorkerUser(), workerId);
} catch (IOException e) {
throw e;
* Read the Heartbeat for the current container.
* @return the Heartbeat
* @throws IOException on any error
public LSWorkerHeartbeat readHeartbeat() throws IOException {
LocalState localState = ConfigUtils.workerState(conf, workerId);
LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
LOG.trace("{}: Reading heartbeat {}", workerId, hb);
return hb;
public boolean areAllProcessesDead() throws IOException {
boolean allDead = true;
if (resourceIsolationManager != null) {
allDead = resourceIsolationManager.areAllProcessesDead(getWorkerUser(), workerId);
if (allDead && shutdownTimer != null) {
shutdownTimer = null;
return allDead;
public void cleanUp() throws IOException {
try (Timer.Context t = cleanupDuration.time()) {
} catch (IOException e) {
//This may or may not be reported depending on when process exits
throw e;
* Setup the container to run. By default this creates the needed directories/links in the local file system PREREQUISITE: All needed
* blobs and topology, jars/configs have been downloaded and placed in the appropriate locations
* @throws IOException on any error
protected void setup() throws IOException {
if (!ops.doRequiredTopoFilesExist(conf, topologyId)) {"Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}",
supervisorId, port, workerId);
throw new IllegalStateException("Not all needed files are here!!!!");
}"Setting up {}:{}", supervisorId, workerId);
ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(conf, workerId)));
ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId)));
ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(conf, workerId)));
File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(conf, topologyId, port));
if (!ops.fileExists(workerArtifacts)) {
ops.setupWorkerArtifactsDir(assignment.get_owner(), workerArtifacts);
String user = getWorkerUser();
* Write out the file used by the log viewer to allow/reject log access.
* @param user the user this is going to run as
* @throws IOException on any error
protected void writeLogMetadata(String user) throws IOException {
Map<String, Object> data = new HashMap<>();
data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
data.put("worker-id", workerId);
Set<String> logsGroups = new HashSet<>();
if (topoConf.get(DaemonConfig.LOGS_GROUPS) != null) {
List<String> groups = ObjectReader.getStrings(topoConf.get(DaemonConfig.LOGS_GROUPS));
if (topoConf.get(Config.TOPOLOGY_GROUPS) != null) {
List<String> topGroups = ObjectReader.getStrings(topoConf.get(Config.TOPOLOGY_GROUPS));
data.put(DaemonConfig.LOGS_GROUPS, logsGroups.toArray());
Set<String> logsUsers = new HashSet<>();
if (topoConf.get(DaemonConfig.LOGS_USERS) != null) {
List<String> logUsers = ObjectReader.getStrings(topoConf.get(DaemonConfig.LOGS_USERS));
if (topoConf.get(Config.TOPOLOGY_USERS) != null) {
List<String> topUsers = ObjectReader.getStrings(topoConf.get(Config.TOPOLOGY_USERS));
data.put(DaemonConfig.LOGS_USERS, logsUsers.toArray());
if (topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS) != null) {
int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
int defaultWorkerTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
topoTimeout = Math.max(topoTimeout, defaultWorkerTimeout);
data.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, topoTimeout);
File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port);
Yaml yaml = new Yaml();
try (Writer writer = ops.getWriter(file)) {
yaml.dump(data, writer);
* Create symlink from the containers directory/artifacts to the artifacts directory.
* @throws IOException on any error
protected void createArtifactsLink() throws IOException {
if (!symlinksDisabled) {
File workerDir = new File(ConfigUtils.workerRoot(conf, workerId));
File topoDir = new File(ConfigUtils.workerArtifactsRoot(conf, topologyId, port));
if (ops.fileExists(workerDir)) {
LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", workerId, topologyId);
ops.createSymlink(new File(ConfigUtils.workerArtifactsSymlink(conf, workerId)), topoDir);
* Create symlinks for each of the blobs from the container's directory to corresponding links in the storm dist directory.
* @throws IOException on any error.
protected void createBlobstoreLinks() throws IOException {
String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
String workerRoot = ConfigUtils.workerRoot(conf, workerId);
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
List<String> blobFileNames = new ArrayList<>();
if (blobstoreMap != null) {
for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
String key = entry.getKey();
Map<String, Object> blobInfo = entry.getValue();
String ret = null;
if (blobInfo != null && blobInfo.containsKey("localname")) {
ret = (String) blobInfo.get("localname");
} else {
ret = key;
File targetResourcesDir = new File(stormRoot, ServerConfigUtils.RESOURCES_SUBDIR);
List<String> resourceFileNames = new ArrayList<>();
if (targetResourcesDir.exists()) {
if (!symlinksDisabled) {"Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, topologyId, resourceFileNames.size(),
if (targetResourcesDir.exists()) {
ops.createSymlink(new File(workerRoot, ServerConfigUtils.RESOURCES_SUBDIR), targetResourcesDir);
} else {"Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}.", workerId, topologyId,
for (String fileName : blobFileNames) {
ops.createSymlink(new File(workerRoot, fileName),
new File(stormRoot, fileName));
} else if (blobFileNames.size() > 0) {
LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames);
* Get the user of the worker.
* @return the user that some operations should be done as.
* @throws IOException on any error
protected String getWorkerUser() throws IOException {
if (cachedUser != null) {
return cachedUser;
}"GET worker-user for {}", workerId);
File file = new File(ConfigUtils.workerUserFile(conf, workerId));
if (ops.fileExists(file)) {
cachedUser = ops.slurpString(file).trim();
if (!StringUtils.isBlank(cachedUser)) {
return cachedUser;
if (assignment != null && assignment.is_set_owner()) {
cachedUser = assignment.get_owner();
if (!StringUtils.isBlank(cachedUser)) {
return cachedUser;
if (ConfigUtils.isLocalMode(conf)) {
cachedUser = System.getProperty("");
return cachedUser;
} else {
File f = new File(ConfigUtils.workerArtifactsRoot(conf));
if (f.exists()) {
cachedUser = Files.getOwner(f.toPath()).getName();
if (!StringUtils.isBlank(cachedUser)) {
return cachedUser;
throw new IllegalStateException("Could not recover the user for " + workerId);
protected void saveWorkerUser(String user) throws IOException {
type.assertFull();"SET worker-user {} {}", workerId, user);
ops.dump(new File(ConfigUtils.workerUserFile(conf, workerId)), user);
protected void deleteSavedWorkerUser() throws IOException {"REMOVE worker-user {}", workerId);
ops.deleteIfExists(new File(ConfigUtils.workerUserFile(conf, workerId)));
* Clean up the container partly preparing for restart. By default delete all of the temp directories we are going to get a new
* worker_id anyways. POST CONDITION: the workerId will be set to null
* @throws IOException on any error
public void cleanUpForRestart() throws IOException {"Cleaning up {}:{}", supervisorId, workerId);
String user = getWorkerUser();
//clean up for resource isolation if enabled
if (resourceIsolationManager != null) {
resourceIsolationManager.cleanup(user, workerId, port);
//Always make sure to clean up everything else before worker directory
//is removed since that is what is going to trigger the retry for cleanup
ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(conf, workerId)), user, workerId);
ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(conf, workerId)), user, workerId);
ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(conf, workerId)), user, workerId);
ops.deleteIfExists(new File(ConfigUtils.workerRoot(conf, workerId)), user, workerId);
workerId = null;
* Check if the container is over its memory limit AND needs to be killed. This does not necessarily mean that it just went over the
* limit.
* @throws IOException on any error
public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
return false;
protected void updateMemoryAccounting() {
long used = getMemoryUsageMb();
long reserved = getMemoryReservationMb();
containerMemoryTracker.setUsedMemoryMb(port, topologyId, used);
containerMemoryTracker.setReservedMemoryMb(port, topologyId, reserved);
* Get the total memory used (on and off heap).
public long getTotalTopologyMemoryUsed() {
return containerMemoryTracker.getUsedMemoryMb(topologyId);
* Get the total memory reserved.
* @param withUpdatedLimits the local assignment with shared memory
* @return the total memory reserved.
public long getTotalTopologyMemoryReserved(LocalAssignment withUpdatedLimits) {
long ret =
if (withUpdatedLimits.is_set_total_node_shared()) {
ret += withUpdatedLimits.get_total_node_shared();
return ret;
* Get the number of workers for this topology.
public long getTotalWorkersForThisTopology() {
return containerMemoryTracker.getAssignedWorkerCount(topologyId);
* Get the current memory usage of this container.
public long getMemoryUsageMb() {
return 0;
* Get the current memory reservation of this container.
public long getMemoryReservationMb() {
return 0;
* Launch the process for the first time. PREREQUISITE: setup has run and passed
* @throws IOException on any error
public abstract void launch() throws IOException;
* Restart the processes in this container. PREREQUISITE: cleanUpForRestart has run and passed
* @throws IOException on any error
public abstract void relaunch() throws IOException;
* Return true if the main process exited, else false. This is just best effort return false if unknown.
public abstract boolean didMainProcessExit();
* Run a profiling request.
* @param request the request to run
* @param stop is this a stop request?
* @return true if it succeeded, else false
* @throws IOException on any error
* @throws InterruptedException if running the command is interrupted.
public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException;
* Get the id of the container or null if there is no worker id right now.
public String getWorkerId() {
return workerId;
* Send worker metrics to Nimbus.
void processMetrics(OnlyLatestExecutor<Integer> exec, WorkerMetricsProcessor processor) {
try {
Optional<Long> usedMemoryForPort = containerMemoryTracker.getUsedMemoryMb(port);
if (usedMemoryForPort.isPresent()) {
// Make sure we don't process too frequently.
long nextMetricProcessTime = this.lastMetricProcessTime + 60L * 1000L;
long currentTimeMsec = System.currentTimeMillis();
if (currentTimeMsec < nextMetricProcessTime) {
String hostname = Utils.hostname();
// create metric for memory
long timestamp = System.currentTimeMillis();
WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC,
WorkerMetricList metricList = new WorkerMetricList();
WorkerMetrics metrics = new WorkerMetrics(topologyId, port, hostname, metricList);
exec.execute(port, () -> {
try {
processor.processWorkerMetrics(conf, metrics);
} catch (MetricException e) {
LOG.error("Failed to process metrics", e);
} catch (Exception e) {
LOG.error("Failed to process metrics", e);
} finally {
this.lastMetricProcessTime = System.currentTimeMillis();
public enum ContainerType {
LAUNCH(false, false),
RECOVER_FULL(true, false),
RECOVER_PARTIAL(true, true);
private final boolean recovery;
private final boolean onlyKillable;
ContainerType(boolean recovery, boolean onlyKillable) {
this.recovery = recovery;
this.onlyKillable = onlyKillable;
public boolean isRecovery() {
return recovery;
public void assertFull() {
if (onlyKillable) {
throw new IllegalStateException("Container is only Killable.");
public boolean isOnlyKillable() {
return onlyKillable;