blob: b2681eafe22fb0a3aa3a2a7c61f0752835111119 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.supervisor;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RunAsUserContainer extends BasicContainer {
private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class);
public RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId,
int supervisorPort, int port, LocalAssignment assignment,
ResourceIsolationInterface resourceIsolationManager, LocalState localState,
String workerId, StormMetricsRegistry metricsRegistry,
ContainerMemoryTracker containerMemoryTracker) throws IOException {
this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, workerId, metricsRegistry,
containerMemoryTracker, null, null, null);
}
RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
LocalState localState, String workerId, StormMetricsRegistry metricsRegistry,
ContainerMemoryTracker containerMemoryTracker, Map<String, Object> topoConf,
AdvancedFSOps ops, String profileCmd) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
workerId, metricsRegistry, containerMemoryTracker, topoConf, ops, profileCmd);
if (Utils.isOnWindows()) {
throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
}
}
private void signal(long pid, int signal) throws IOException {
List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal));
String user = getWorkerUser();
String logPrefix = "kill -" + signal + " " + pid;
ClientSupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix);
}
@Override
protected void kill(long pid) throws IOException {
signal(pid, 15);
}
@Override
protected void forceKill(long pid) throws IOException {
signal(pid, 9);
}
@Override
protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws
IOException, InterruptedException {
String user = this.getWorkerUser();
String td = targetDir.getAbsolutePath();
LOG.info("Running as user: {} command: {}", user, command);
String containerFile = ServerUtils.containerFilePath(td);
if (Utils.checkFileExists(containerFile)) {
SupervisorUtils.rmrAsUser(conf, containerFile, containerFile);
}
String scriptFile = ServerUtils.scriptFilePath(td);
if (Utils.checkFileExists(scriptFile)) {
SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile);
}
String script = ServerUtils.writeScript(td, command, env);
List<String> args = Arrays.asList("profiler", td, script);
int ret = ClientSupervisorUtils.processLauncherAndWait(conf, user, args, env, logPrefix);
return ret == 0;
}
@Override
protected void launchWorkerProcess(List<String> command, Map<String, String> env,
String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException {
String workerDir = targetDir.getAbsolutePath();
String user = this.getWorkerUser();
List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
List<String> commandPrefix = null;
if (resourceIsolationManager != null) {
commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId);
}
ClientSupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
}
/**
* If 'supervisor.run.worker.as.user' is set, worker will be launched as the user that launched the topology.
*/
@Override
protected String getRunWorkerAsUser() {
try {
return getWorkerUser();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}