blob: f0eb3a5e5acd875a94d053de1cbf3a5d10387499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.container;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.daemon.supervisor.ExitCodeCallback;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the default class to manage worker processes, including launching, killing, profiling and etc.
*/
public class DefaultResourceIsolationManager implements ResourceIsolationInterface {
private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceIsolationManager.class);
protected Map<String, Object> conf;
protected boolean runAsUser;
@Override
public void prepare(Map<String, Object> conf) throws IOException {
this.conf = conf;
runAsUser = ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
}
@Override
public void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu, String numaId) {
//NO OP
}
@Override
public void releaseResourcesForWorker(String workerId) {
//NO OP
}
@Override
public void cleanup(String user, String workerId, int port) throws IOException {
//NO OP
}
@Override
public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
int port, String workerId, List<String> command, Map<String, String> env,
String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
if (runAsUser) {
String workerDir = targetDir.getAbsolutePath();
List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
ClientSupervisorUtils.processLauncher(
conf, user, null, args, null, logPrefix,
processExitCallback, targetDir
);
} else {
ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
}
}
@Override
public long getMemoryUsage(String user, String workerId, int port) throws IOException {
return 0;
}
@Override
public long getSystemFreeMemoryMb() throws IOException {
return 0;
}
@Override
public void kill(String user, String workerId) throws IOException {
Set<Long> pids = getAllPids(workerId);
for (Long pid : pids) {
kill(pid, user);
}
}
/**
* Kill a given process.
* @param pid the id of the process to kill
* @throws IOException on I/O exception
*/
private void kill(long pid, String user) throws IOException {
if (runAsUser) {
signal(pid, 15, user);
} else {
ServerUtils.killProcessWithSigTerm(String.valueOf(pid));
}
}
@Override
public void forceKill(String user, String workerId) throws IOException {
Set<Long> pids = getAllPids(workerId);
for (Long pid : pids) {
forceKill(pid, user);
}
}
/**
* Kill a given process forcefully.
* @param pid the id of the process to kill
* @throws IOException on I/O exception
*/
private void forceKill(long pid, String user) throws IOException {
if (runAsUser) {
signal(pid, 9, user);
} else {
ServerUtils.forceKillProcess(String.valueOf(pid));
}
}
/**
* Get all the pids that are a part of the container.
* @return all of the pids that are a part of this container
*/
protected Set<Long> getAllPids(String workerId) throws IOException {
Set<Long> ret = new HashSet<>();
for (String listing : ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId))) {
ret.add(Long.valueOf(listing));
}
return ret;
}
private void signal(long pid, int signal, String user) throws IOException {
List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
String logPrefix = "kill -" + signal + " " + pid;
ClientSupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
}
@Override
public boolean areAllProcessesDead(String user, String workerId) throws IOException {
Set<Long> pids = getAllPids(workerId);
return ServerUtils.areAllProcessesDead(conf, user, workerId, pids);
}
@Override
public boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env,
String logPrefix, File targetDir) throws IOException, InterruptedException {
if (runAsUser) {
String td = targetDir.getAbsolutePath();
LOG.info("Running as user: {} command: {}", user, command);
String containerFile = ServerUtils.containerFilePath(td);
if (Utils.checkFileExists(containerFile)) {
SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
}
String scriptFile = ServerUtils.scriptFilePath(td);
if (Utils.checkFileExists(scriptFile)) {
SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
}
String script = ServerUtils.writeScript(td, command, env);
List<String> args = Arrays.asList("profiler", td, script);
int ret = ClientSupervisorUtils.processLauncherAndWait(conf, user, args, env, logPrefix);
return ret == 0;
} else {
Process p = ClientSupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
int ret = p.waitFor();
return ret == 0;
}
}
/**
* This class doesn't really manage resources.
* @return false
*/
@Override
public boolean isResourceManaged() {
return false;
}
}