blob: 47bb45802f866468829ebf6bee19a838e77af797 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.container.docker;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.container.cgroup.core.MemoryCore;
import org.apache.storm.container.oci.OciContainerManager;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.daemon.supervisor.ExitCodeCallback;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.ShellCommandRunnerImpl;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* For security, we can launch worker processes inside the docker container.
* This class manages the interaction with docker containers including launching, stopping, profiling and etc.
public class DockerManager extends OciContainerManager {
private static final Logger LOG = LoggerFactory.getLogger(DockerManager.class);
private Map<String, String> workerToCid = new ConcurrentHashMap<>();
public void prepare(Map<String, Object> conf) throws IOException {
private String[] getGroupIdInfo(String userName)
throws IOException {
String[] groupIds;
try {
String output = new ShellCommandRunnerImpl().execCommand("id", "--groups", userName);
groupIds = output.trim().split(" ");
} catch (IOException e) {
LOG.error("Can't get group IDs of the user {}", userName);
throw new IOException(e);
return groupIds;
private String getUserIdInfo(String userName) throws IOException {
String uid = "";
try {
uid = new ShellCommandRunnerImpl().execCommand("id", "--user", userName).trim();
} catch (IOException e) {
LOG.error("Can't get uid of the user {}", userName);
throw e;
return uid;
public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
int port, String workerId, List<String> command, Map<String, String> env,
String logPrefix, ExitCodeCallback processExitCallback,
File targetDir) throws IOException {
String dockerImage = getImageName(topoConf);
if (dockerImage == null) {
LOG.error("Image name for {} is not configured properly; will not continue to launch the worker", topologyId);
String workerDir = targetDir.getAbsolutePath();
String uid = getUserIdInfo(user);
String[] groups = getGroupIdInfo(user);
String gid = groups[0];
String dockerUser = uid + ":" + gid;
DockerRunCommand dockerRunCommand = new DockerRunCommand(workerId, dockerUser, dockerImage);
//set of locations to be bind mounted
String workerRootDir = ConfigUtils.workerRoot(conf, workerId);
String workerArtifactsRoot = ConfigUtils.workerArtifactsRoot(conf, topologyId, port);
String workerUserFile = ConfigUtils.workerUserFile(conf, workerId);
String sharedByTopologyDir = ConfigUtils.sharedByTopologyDir(conf, topologyId);
// Theoretically we only need to mount ConfigUtils.supervisorStormDistRoot directory.
// But if supervisorLocalDir is not mounted, the worker will try to create it and fail.
String supervisorLocalDir = ConfigUtils.supervisorLocalDir(conf);
String workerTmpRoot = ConfigUtils.workerTmpRoot(conf, workerId);
//The whole file system of the container will be read-only except specific read-write bind mounts
.addReadOnlyMountLocation(cgroupRootPath, cgroupRootPath, false)
.addReadOnlyMountLocation(stormHome, stormHome, false)
.addReadOnlyMountLocation(supervisorLocalDir, supervisorLocalDir, false)
.addReadWriteMountLocation(workerRootDir, workerRootDir, false)
.addReadWriteMountLocation(workerArtifactsRoot, workerArtifactsRoot, false)
.addReadWriteMountLocation(workerUserFile, workerUserFile, false)
//nscd must be running so that profiling can work properly
.addReadWriteMountLocation(nscdPath, nscdPath, false)
.addReadWriteMountLocation(sharedByTopologyDir, sharedByTopologyDir, false)
//This is to make /tmp directory in container writable. This is very important.
// For example
// 1. jvm needs to write to /tmp/hsperfdata_<user> directory so that jps can work
// 2. jstack needs to create a socket under /tmp directory.
//Otherwise profiling will not work properly.
.addReadWriteMountLocation(workerTmpRoot, TMP_DIR, false)
//a list of read-only bind mount locations
.addAllReadOnlyMountLocations(readonlyBindmounts, false)
.addAllReadWriteMountLocations(readwriteBindmounts, false);
if (workerToCores.containsKey(workerId)) {
workerToCores.get(workerId), workerToMemoryZone.get(workerId)
if (seccompJsonFile != null) {
if (workerToCpu.containsKey(workerId)) {
dockerRunCommand.setCpus(workerToCpu.get(workerId) / 100.0);
if (workerToMemoryMb.containsKey(workerId)) {
dockerRunCommand.setOverrideCommandWithArgs(Arrays.asList("bash", ServerUtils.writeScript(workerDir, command, env, "0027")));
//run docker-run command and launch container in background (-d option).
runDockerCommandWaitFor(conf, user, CmdType.LAUNCH_DOCKER_CONTAINER,
dockerRunCommand.getCommandWithArguments(), null, logPrefix, null, targetDir, "docker-run");
//docker-wait for the container in another thread. processExitCallback will get the container's exit code.
String threadName = "DockerWait_SLOT_" + port;
Utils.asyncLoop(new Callable<Long>() {
public Long call() throws IOException {
DockerWaitCommand dockerWaitCommand = new DockerWaitCommand(workerId);
try {
runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD,
dockerWaitCommand.getCommandWithArguments(), null, logPrefix, processExitCallback, targetDir, "docker-wait");
} catch (IOException e) {
LOG.error("IOException on running docker wait command:", e);
throw e;
return null; // Run only once.
}, threadName, null);
public void releaseResourcesForWorker(String workerId) {
//Get the container ID of the worker
private String getContainerId(String workerId) throws IOException {
String cid = workerToCid.get(workerId);
if (cid == null) {
File cidFile = new File(dockerCidFilePath(workerId));
if (cidFile.exists()) {
List<String> lines = Files.readLines(cidFile, Charset.defaultCharset());
if (lines.isEmpty()) {
LOG.error("cid file {} is empty.", cidFile);
} else {
cid = lines.get(0);
} else {
LOG.error("cid file {} doesn't exist.", cidFile);
if (cid == null) {
LOG.error("Couldn't get container id of the worker {}", workerId);
throw new IOException("Couldn't get container id of the worker " + workerId);
} else {
workerToCid.put(workerId, cid);
return cid;
public long getMemoryUsage(String user, String workerId, int port) throws IOException {
String memoryCgroupPath = memoryCgroupRootPath + File.separator + getContainerId(workerId);
MemoryCore memoryCore = new MemoryCore(memoryCgroupPath);
return memoryCore.getPhysicalUsage();
public void kill(String user, String workerId) throws IOException {
String workerDir = ConfigUtils.workerRoot(conf, workerId);
DockerStopCommand dockerStopCommand = new DockerStopCommand(workerId);
runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD, dockerStopCommand.getCommandWithArguments(),
null, null, null, new File(workerDir), "docker-stop");
DockerRmCommand dockerRmCommand = new DockerRmCommand(workerId);
runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD, dockerRmCommand.getCommandWithArguments(),
null, null, null, new File(workerDir), "docker-rm");
public void forceKill(String user, String workerId) throws IOException {
String workerDir = ConfigUtils.workerRoot(conf, workerId);
DockerRmCommand dockerRmCommand = new DockerRmCommand(workerId);
runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD, dockerRmCommand.getCommandWithArguments(),
null, null, null, new File(workerDir), "docker-force-rm");
* Currently it only checks if the container is alive.
* If the worker process inside the container dies, the container will exit.
* So we only need to check if the container is running to know if the worker process is still alive.
* @param user the user of the processes
* @param workerId the id of the worker to kill
* @return true if all processes are dead
* @throws IOException on I/O exception
public boolean areAllProcessesDead(String user, String workerId) throws IOException {
String workerDir = ConfigUtils.workerRoot(conf, workerId);
DockerPsCommand dockerPsCommand = new DockerPsCommand();
String command = dockerPsCommand.getCommandWithArguments();
Process p = runDockerCommand(conf, user, CmdType.RUN_DOCKER_CMD, command,
null, null, null, new File(workerDir), "docker-ps");
try {
} catch (InterruptedException e) {
LOG.error("running docker command is interrupted", e);
if (p.exitValue() != 0) {
String errorMessage = "The exitValue of the docker command [" + command + "] is non-zero: " + p.exitValue();
throw new IOException(errorMessage);
String output = IOUtils.toString(p.getInputStream(), Charset.forName("UTF-8"));
LOG.debug("The output of the docker command [{}] is: [{}]; the exitValue is {}", command, output, p.exitValue());
//The output might include some things else
//The real output of the docker-ps command is either empty or the container's short ID
output = output.trim();
String[] lines = output.split("\n");
if (lines.length == 0) {
//output is empty, the container is not running
return true;
String lastLine = lines[lines.length - 1].trim();
if (lastLine.isEmpty()) {
return true;
try {
String containerId = getContainerId(workerId);
return !containerId.startsWith(lastLine);
} catch (IOException e) {
LOG.error("Failed to find Container ID for {}, assuming dead", workerId, e);
return true;
* Run profiling command in the container.
* @param user the user that the worker is running as
* @param workerId the id of the worker
* @param command the command to run.
* The profiler to be used is configured in worker-launcher.cfg.
* @param env the environment to run the command
* @param logPrefix the prefix to include in the logs
* @param targetDir the working directory to run the command in
* @return true if the command succeeds, false otherwise.
* @throws IOException on I/O exception
* @throws InterruptedException if interrupted
public boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env,
String logPrefix, File targetDir) throws IOException, InterruptedException {
String workerDir = targetDir.getAbsolutePath();
String profilingArgs = StringUtils.join(command, " ");
//run nsenter
String nsenterScriptPath = writeToCommandFile(workerDir, profilingArgs, "profile");
List<String> args = Arrays.asList(CmdType.PROFILE_DOCKER_CONTAINER.toString(), workerId, nsenterScriptPath);
Process process = ClientSupervisorUtils.processLauncher(
conf, user, null, args, env, logPrefix, null, targetDir
int exitCode = process.exitValue();
LOG.debug("WorkerId {} : exitCode from {}: {}", workerId, CmdType.PROFILE_DOCKER_CONTAINER.toString(), exitCode);
return exitCode == 0;
public void cleanup(String user, String workerId, int port) throws IOException {
private String dockerCidFilePath(String workerId) {
return ConfigUtils.workerRoot(conf, workerId) + File.separator + "container.cid";
public boolean isResourceManaged() {
return true;
* Run docker command using {@link Config#SUPERVISOR_WORKER_LAUNCHER}.
* @param conf the storm conf
* @param dockerCommand the docker command to run
* @param environment the environment
* @param logPrefix the prefix of logs
* @param exitCodeCallback the exit call back
* @param targetDir the working directory
* @return the Process
* @throws IOException on I/O exception
private Process runDockerCommand(Map<String, Object> conf, String user,
CmdType cmdType, String dockerCommand,
Map<String, String> environment, final String logPrefix,
final ExitCodeCallback exitCodeCallback, File targetDir, String commandTag) throws IOException {
String workerDir = targetDir.getAbsolutePath();
String dockerScriptPath = writeToCommandFile(workerDir, dockerCommand, commandTag);
List<String> args = Arrays.asList(cmdType.toString(), workerDir, dockerScriptPath);
return ClientSupervisorUtils.processLauncher(conf, user, null, args, environment,
logPrefix, exitCodeCallback, targetDir);
* Run docker command using {@link Config#SUPERVISOR_WORKER_LAUNCHER}.
* @param conf the storm conf
* @param dockerCommand the docker command to run
* @param environment the environment
* @param logPrefix the prefix of logs
* @param exitCodeCallback the exit call back
* @param targetDir the working directory
* @return the Process
* @throws IOException on I/O exception
private int runDockerCommandWaitFor(Map<String, Object> conf, String user,
CmdType cmdType, String dockerCommand,
Map<String, String> environment, final String logPrefix,
final ExitCodeCallback exitCodeCallback, File targetDir, String commandTag) throws IOException {
Process p = runDockerCommand(
conf, user, cmdType, dockerCommand, environment, logPrefix,
exitCodeCallback, targetDir, commandTag
try {
} catch (InterruptedException e) {
LOG.error("running docker command is interrupted", e);
return p.exitValue();