blob: ef4b54d225a79276de0f5dd22bcf9343ba0ca62d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.supervisor;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.localizer.LocalResource;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
public class SupervisorUtils {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
private static final SupervisorUtils INSTANCE = new SupervisorUtils();
private static SupervisorUtils _instance = INSTANCE;
public static void setInstance(SupervisorUtils u) {
_instance = u;
}
public static void resetInstance() {
_instance = INSTANCE;
}
static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
if (StringUtils.isBlank(user)) {
throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
}
String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
String wl;
if (StringUtils.isNotBlank(wlinitial)) {
wl = wlinitial;
} else {
wl = stormHome + "/bin/worker-launcher";
}
List<String> commands = new ArrayList<>();
if (commandPrefix != null){
commands.addAll(commandPrefix);
}
commands.add(wl);
commands.add(user);
commands.addAll(args);
LOG.info("Running as user: {} command: {}", user, commands);
return SupervisorUtils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
}
public static int processLauncherAndWait(Map<String, Object> conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
throws IOException {
int ret = 0;
Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
if (StringUtils.isNotBlank(logPreFix))
Utils.readAndLogStream(logPreFix, process.getInputStream());
try {
process.waitFor();
} catch (InterruptedException e) {
LOG.info("{} interrupted.", logPreFix);
}
ret = process.exitValue();
return ret;
}
public static void setupStormCodeDir(Map<String, Object> conf, String user, String dir) throws IOException {
if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
String logPrefix = "Storm Code Dir Setup for " + dir;
List<String> commands = new ArrayList<>();
commands.add("code-dir");
commands.add(dir);
processLauncherAndWait(conf, user, commands, null, logPrefix);
}
}
public static void setupWorkerArtifactsDir(Map<String, Object> conf, String user, String dir) throws IOException {
if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
String logPrefix = "Worker Artifacts Setup for " + dir;
List<String> commands = new ArrayList<>();
commands.add("artifacts-dir");
commands.add(dir);
processLauncherAndWait(conf, user, commands, null, logPrefix);
}
}
public static void rmrAsUser(Map<String, Object> conf, String id, String path) throws IOException {
String user = Utils.getFileOwner(path);
String logPreFix = "rmr " + id;
List<String> commands = new ArrayList<>();
commands.add("rmr");
commands.add(path);
SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
if (Utils.checkFileExists(path)) {
throw new RuntimeException(path + " was not deleted.");
}
}
/**
* Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
* returns false
*
* @param blobInfo
* @return
*/
public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
return Utils.getBoolean(blobInfo.get("uncompress"), false);
}
/**
* Returns a list of LocalResources based on the blobstore-map passed in
*
* @param blobstoreMap
* @return
*/
public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
List<LocalResource> localResourceList = new ArrayList<>();
if (blobstoreMap != null) {
for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
localResourceList.add(localResource);
}
}
return localResourceList;
}
/**
* For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
*
* @param localizer
* @param stormId
* @param conf
*/
static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
if (blobstoreMap != null) {
localizer.addReferences(localresources, user, topoName);
}
}
public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
Set<String> stormIds = new HashSet<>();
String path = ConfigUtils.supervisorStormDistRoot(conf);
Collection<String> rets = Utils.readDirContents(path);
for (String ret : rets) {
stormIds.add(URLDecoder.decode(ret));
}
return stormIds;
}
public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
String workerRoot = ConfigUtils.workerRoot(conf);
return Utils.readDirContents(workerRoot);
}
static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
if (!Utils.checkFileExists(stormroot))
return false;
if (!Utils.checkFileExists(stormcodepath))
return false;
if (!Utils.checkFileExists(stormconfpath))
return false;
if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
return true;
return false;
}
/**
* map from worker id to heartbeat
*
* @param conf
* @return
* @throws Exception
*/
public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
return _instance.readWorkerHeartbeatsImpl(conf);
}
public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception {
Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
for (String workerId : workerIds) {
LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
// ATTENTION: whb can be null
workerHeartbeats.put(workerId, whb);
}
return workerHeartbeats;
}
/**
* get worker heartbeat by workerId
*
* @param conf
* @param workerId
* @return
* @throws IOException
*/
private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) {
return _instance.readWorkerHeartbeatImpl(conf, workerId);
}
protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> conf, String workerId) {
try {
LocalState localState = ConfigUtils.workerState(conf, workerId);
return localState.getWorkerHeartBeat();
} catch (Exception e) {
LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
return null;
}
}
public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
}
private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
return (now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
}
/**
* Launch a new process as per {@link java.lang.ProcessBuilder} with a given
* callback.
* @param command the command to be executed in the new process
* @param environment the environment to be applied to the process. Can be
* null.
* @param logPrefix a prefix for log entries from the output of the process.
* Can be null.
* @param exitCodeCallback code to be called passing the exit code value
* when the process completes
* @param dir the working directory of the new process
* @return the new process
* @throws IOException
* @see java.lang.ProcessBuilder
*/
public static Process launchProcess(List<String> command,
Map<String,String> environment,
final String logPrefix,
final ExitCodeCallback exitCodeCallback,
File dir)
throws IOException {
ProcessBuilder builder = new ProcessBuilder(command);
Map<String,String> procEnv = builder.environment();
if (dir != null) {
builder.directory(dir);
}
builder.redirectErrorStream(true);
if (environment != null) {
procEnv.putAll(environment);
}
final Process process = builder.start();
if (logPrefix != null || exitCodeCallback != null) {
Utils.asyncLoop(new Callable<Object>() {
public Object call() {
if (logPrefix != null ) {
Utils.readAndLogStream(logPrefix,
process.getInputStream());
}
if (exitCodeCallback != null) {
try {
process.waitFor();
exitCodeCallback.call(process.exitValue());
} catch (InterruptedException ie) {
LOG.info("{} interrupted", logPrefix);
exitCodeCallback.call(-1);
}
}
return null; // Run only once.
}
});
}
return process;
}
static List<ACL> supervisorZkAcls() {
final List<ACL> acls = new ArrayList<>();
acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
return acls;
}
}