blob: 345454ca53c0bacc3abe6b5ca0495082561bddca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.supervisor;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.lang.ProcessBuilder.Redirect;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.apache.storm.utils.Utils.OR;
/**
* Represents a container that a worker will run in.
*/
public abstract class Container implements Killable {
private static final Logger LOG = LoggerFactory.getLogger(Container.class);
public static enum ContainerType {
LAUNCH(false, false),
RECOVER_FULL(true, false),
RECOVER_PARTIAL(true, true);
private final boolean _recovery;
private final boolean _onlyKillable;
ContainerType(boolean recovery, boolean onlyKillable) {
_recovery = recovery;
_onlyKillable = onlyKillable;
}
public boolean isRecovery() {
return _recovery;
}
public void assertFull() {
if (_onlyKillable) {
throw new IllegalStateException("Container is only Killable.");
}
}
public boolean isOnlyKillable() {
return _onlyKillable;
}
}
protected final Map<String, Object> _conf;
protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL
protected String _workerId;
protected final String _topologyId; //Not set if RECOVER_PARTIAL
protected final String _supervisorId;
protected final int _port; //Not set if RECOVER_PARTIAL
protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL
protected final AdvancedFSOps _ops;
protected ContainerType _type;
protected final boolean _symlinksDisabled;
/**
* Create a new Container.
* @param type the type of container being made.
* @param conf the supervisor config
* @param supervisorId the ID of the supervisor this is a part of.
* @param port the port the container is on. Should be <= 0 if only a partial recovery
* @param assignment the assignment for this container. Should be null if only a partial recovery.
* @param workerId the id of the worker to use. Must not be null if doing a partial recovery.
* @param topoConf the config of the topology (mostly for testing) if null
* and not a partial recovery the real conf is read.
* @param ops file system operations (mostly for testing) if null a new one is made
* @throws IOException on any error.
*/
protected Container(ContainerType type, Map<String, Object> conf, String supervisorId,
int port, LocalAssignment assignment,
String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException {
assert(type != null);
assert(conf != null);
assert(supervisorId != null);
_symlinksDisabled = (boolean)OR(conf.get(Config.DISABLE_SYMLINKS), false);
if (ops == null) {
ops = AdvancedFSOps.make(conf);
}
_workerId = workerId;
_type = type;
_port = port;
_ops = ops;
_conf = conf;
_supervisorId = supervisorId;
_assignment = assignment;
if (_type.isOnlyKillable()) {
assert(_assignment == null);
assert(_port <= 0);
assert(_workerId != null);
_topologyId = null;
_topoConf = null;
} else {
assert(assignment != null);
assert(port > 0);
_topologyId = assignment.get_topology_id();
if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
_supervisorId, _port, _workerId);
throw new ContainerRecoveryException("Missing required topology files...");
}
if (topoConf == null) {
_topoConf = readTopoConf();
} else {
//For testing...
_topoConf = topoConf;
}
}
}
@Override
public String toString() {
return "topo:" + _topologyId + " worker:" + _workerId;
}
protected Map<String, Object> readTopoConf() throws IOException {
assert(_topologyId != null);
return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
}
/**
* Kill a given process
* @param pid the id of the process to kill
* @throws IOException
*/
protected void kill(long pid) throws IOException {
Utils.killProcessWithSigTerm(String.valueOf(pid));
}
/**
* Kill a given process
* @param pid the id of the process to kill
* @throws IOException
*/
protected void forceKill(long pid) throws IOException {
Utils.forceKillProcess(String.valueOf(pid));
}
@Override
public void kill() throws IOException {
LOG.info("Killing {}:{}", _supervisorId, _workerId);
Set<Long> pids = getAllPids();
for (Long pid : pids) {
kill(pid);
}
}
@Override
public void forceKill() throws IOException {
LOG.info("Force Killing {}:{}", _supervisorId, _workerId);
Set<Long> pids = getAllPids();
for (Long pid : pids) {
forceKill(pid);
}
}
/**
* Read the Heartbeat for the current container.
* @return the Heartbeat
* @throws IOException on any error
*/
public LSWorkerHeartbeat readHeartbeat() throws IOException {
LocalState localState = ConfigUtils.workerState(_conf, _workerId);
LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
LOG.trace("{}: Reading heartbeat {}", _workerId, hb);
return hb;
}
/**
* Is a process alive and running?
* @param pid the PID of the running process
* @param user the user that is expected to own that process
* @return true if it is, else false
* @throws IOException on any error
*/
protected boolean isProcessAlive(long pid, String user) throws IOException {
if (Utils.IS_ON_WINDOWS) {
return isWindowsProcessAlive(pid, user);
}
return isPosixProcessAlive(pid, user);
}
private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
boolean ret = false;
ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
pb.redirectError(Redirect.INHERIT);
Process p = pb.start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String read;
while ((read = in.readLine()) != null) {
if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name"
//This line contains the user name for the pid we're looking up
//Example line: "User Name: exampleDomain\exampleUser"
List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":"));
if(userNameLineSplitOnWhitespace.size() == 2){
List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
if(user.equals(processUser)){
ret = true;
} else {
LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
}
} else {
LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read);
}
break;
}
}
}
return ret;
}
private boolean isPosixProcessAlive(long pid, String user) throws IOException {
boolean ret = false;
ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
pb.redirectError(Redirect.INHERIT);
Process p = pb.start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String first = in.readLine();
assert("USER".equals(first));
String processUser;
while ((processUser = in.readLine()) != null) {
if (user.equals(processUser)) {
ret = true;
break;
} else {
LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
}
}
}
return ret;
}
@Override
public boolean areAllProcessesDead() throws IOException {
Set<Long> pids = getAllPids();
String user = getWorkerUser();
boolean allDead = true;
for (Long pid: pids) {
if (!isProcessAlive(pid, user)) {
LOG.debug("{}: PID {} is dead", _workerId, pid);
} else {
allDead = false;
break;
}
}
return allDead;
}
@Override
public void cleanUp() throws IOException {
cleanUpForRestart();
}
/**
* Setup the container to run. By default this creates the needed directories/links in the
* local file system
* PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and
* placed in the appropriate locations
* @throws IOException on any error
*/
protected void setup() throws IOException {
_type.assertFull();
if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
_supervisorId, _port, _workerId);
throw new IllegalStateException("Not all needed files are here!!!!");
}
LOG.info("Setting up {}:{}", _supervisorId, _workerId);
_ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));
_ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
_ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));
File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
if (!_ops.fileExists(workerArtifacts)) {
_ops.forceMkdir(workerArtifacts);
_ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts);
}
String user = getWorkerUser();
writeLogMetadata(user);
saveWorkerUser(user);
createArtifactsLink();
createBlobstoreLinks();
}
/**
* Write out the file used by the log viewer to allow/reject log access
* @param user the user this is going to run as
* @throws IOException on any error
*/
@SuppressWarnings("unchecked")
protected void writeLogMetadata(String user) throws IOException {
_type.assertFull();
Map<String, Object> data = new HashMap<>();
data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
data.put("worker-id", _workerId);
Set<String> logsGroups = new HashSet<>();
if (_topoConf.get(Config.LOGS_GROUPS) != null) {
List<String> groups = (List<String>) _topoConf.get(Config.LOGS_GROUPS);
for (String group : groups){
logsGroups.add(group);
}
}
if (_topoConf.get(Config.TOPOLOGY_GROUPS) != null) {
List<String> topGroups = (List<String>) _topoConf.get(Config.TOPOLOGY_GROUPS);
logsGroups.addAll(topGroups);
}
data.put(Config.LOGS_GROUPS, logsGroups.toArray());
Set<String> logsUsers = new HashSet<>();
if (_topoConf.get(Config.LOGS_USERS) != null) {
List<String> logUsers = (List<String>) _topoConf.get(Config.LOGS_USERS);
for (String logUser : logUsers){
logsUsers.add(logUser);
}
}
if (_topoConf.get(Config.TOPOLOGY_USERS) != null) {
List<String> topUsers = (List<String>) _topoConf.get(Config.TOPOLOGY_USERS);
for (String logUser : topUsers){
logsUsers.add(logUser);
}
}
data.put(Config.LOGS_USERS, logsUsers.toArray());
File file = ConfigUtils.getLogMetaDataFile(_conf, _topologyId, _port);
Yaml yaml = new Yaml();
try (Writer writer = _ops.getWriter(file)) {
yaml.dump(data, writer);
}
}
/**
* Create symlink from the containers directory/artifacts to the artifacts directory
* @throws IOException on any error
*/
protected void createArtifactsLink() throws IOException {
_type.assertFull();
if (!_symlinksDisabled) {
File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId));
File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
if (_ops.fileExists(workerDir)) {
LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId);
_ops.createSymlink(new File(workerDir, "artifacts"), topoDir);
}
}
}
/**
* Create symlinks for each of the blobs from the container's directory to
* corresponding links in the storm dist directory.
* @throws IOException on any error.
*/
protected void createBlobstoreLinks() throws IOException {
_type.assertFull();
String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
String workerRoot = ConfigUtils.workerRoot(_conf, _workerId);
@SuppressWarnings("unchecked")
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
List<String> blobFileNames = new ArrayList<>();
if (blobstoreMap != null) {
for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
String key = entry.getKey();
Map<String, Object> blobInfo = entry.getValue();
String ret = null;
if (blobInfo != null && blobInfo.containsKey("localname")) {
ret = (String) blobInfo.get("localname");
} else {
ret = key;
}
blobFileNames.add(ret);
}
}
File targetResourcesDir = new File(stormRoot, ConfigUtils.RESOURCES_SUBDIR);
List<String> resourceFileNames = new ArrayList<>();
if (targetResourcesDir.exists()) {
resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
}
resourceFileNames.addAll(blobFileNames);
if (!_symlinksDisabled) {
LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames);
if (targetResourcesDir.exists()) {
_ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), targetResourcesDir );
} else {
LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() );
}
for (String fileName : blobFileNames) {
_ops.createSymlink(new File(workerRoot, fileName),
new File(stormRoot, fileName));
}
} else if (blobFileNames.size() > 0) {
LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames);
}
}
/**
* @return all of the pids that are a part of this container.
*/
protected Set<Long> getAllPids() throws IOException {
Set<Long> ret = new HashSet<>();
for (String listing: Utils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) {
ret.add(Long.valueOf(listing));
}
return ret;
}
/**
* @return the user that some operations should be done as.
* @throws IOException on any error
*/
protected String getWorkerUser() throws IOException {
LOG.info("GET worker-user for {}", _workerId);
File file = new File(ConfigUtils.workerUserFile(_conf, _workerId));
if (_ops.fileExists(file)) {
return _ops.slurpString(file).trim();
} else if (_assignment != null && _assignment.is_set_owner()) {
return _assignment.get_owner();
}
if (ConfigUtils.isLocalMode(_conf)) {
return System.getProperty("user.name");
} else {
File f = new File(ConfigUtils.workerArtifactsRoot(_conf));
if (f.exists()) {
return Files.getOwner(f.toPath()).getName();
}
throw new IllegalStateException("Could not recover the user for " + _workerId);
}
}
protected void saveWorkerUser(String user) throws IOException {
_type.assertFull();
LOG.info("SET worker-user {} {}", _workerId, user);
_ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), user);
}
protected void deleteSavedWorkerUser() throws IOException {
LOG.info("REMOVE worker-user {}", _workerId);
_ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, _workerId)));
}
/**
* Clean up the container partly preparing for restart.
* By default delete all of the temp directories we are going
* to get a new worker_id anyways.
* POST CONDITION: the workerId will be set to null
* @throws IOException on any error
*/
public void cleanUpForRestart() throws IOException {
LOG.info("Cleaning up {}:{}", _supervisorId, _workerId);
Set<Long> pids = getAllPids();
String user = getWorkerUser();
for (Long pid : pids) {
File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, pid));
_ops.deleteIfExists(path, user, _workerId);
}
//Always make sure to clean up everything else before worker directory
//is removed since that is what is going to trigger the retry for cleanup
_ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)), user, _workerId);
_ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)), user, _workerId);
_ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)), user, _workerId);
_ops.deleteIfExists(new File(ConfigUtils.workerRoot(_conf, _workerId)), user, _workerId);
deleteSavedWorkerUser();
_workerId = null;
}
/**
* Launch the process for the first time
* PREREQUISITE: setup has run and passed
* @throws IOException on any error
*/
public abstract void launch() throws IOException;
/**
* Restart the processes in this container
* PREREQUISITE: cleanUpForRestart has run and passed
* @throws IOException on any error
*/
public abstract void relaunch() throws IOException;
/**
* @return true if the main process exited, else false. This is just best effort return false if unknown.
*/
public abstract boolean didMainProcessExit();
/**
* Run a profiling request
* @param request the request to run
* @param stop is this a stop request?
* @return true if it succeeded, else false
* @throws IOException on any error
* @throws InterruptedException if running the command is interrupted.
*/
public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException;
/**
* @return the id of the container or null if there is no worker id right now.
*/
public String getWorkerId() {
return _workerId;
}
}