blob: c305a72038e0a43c6f1b7f0c3cb802f076ef5f15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.daemon.supervisor;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.localizer.ILocalizer;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Supervisor implements DaemonCommon, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
private final Map<String, Object> conf;
private final IContext sharedContext;
private volatile boolean active;
private final ISupervisor iSupervisor;
private final Utils.UptimeComputer upTime;
private final String stormVersion;
private final IStormClusterState stormClusterState;
private final LocalState localState;
private final String supervisorId;
private final String assignmentId;
private final String hostName;
// used for reporting used ports when heartbeating
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
private final StormTimer eventTimer;
private final StormTimer blobUpdateTimer;
private final Localizer localizer;
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
private Supervisor(ISupervisor iSupervisor) throws IOException {
this(Utils.readStormConfig(), null, iSupervisor);
}
public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
this.conf = conf;
this.iSupervisor = iSupervisor;
this.active = true;
this.upTime = Utils.makeUptimeComputer();
this.stormVersion = VersionInfo.getVersion();
this.sharedContext = sharedContext;
iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
List<ACL> acls = null;
if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
acls = SupervisorUtils.supervisorZkAcls();
}
try {
this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
} catch (Exception e) {
LOG.error("supervisor can't create stormClusterState");
throw Utils.wrapInRuntime(e);
}
try {
this.localState = ConfigUtils.supervisorState(conf);
this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
} catch (IOException e) {
throw Utils.wrapInRuntime(e);
}
this.supervisorId = iSupervisor.getSupervisorId();
this.assignmentId = iSupervisor.getAssignmentId();
try {
this.hostName = Utils.hostname();
} catch (UnknownHostException e) {
throw Utils.wrapInRuntime(e);
}
this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
}
public String getId() {
return supervisorId;
}
IContext getSharedContext() {
return sharedContext;
}
public Map<String, Object> getConf() {
return conf;
}
public ISupervisor getiSupervisor() {
return iSupervisor;
}
public Utils.UptimeComputer getUpTime() {
return upTime;
}
public String getStormVersion() {
return stormVersion;
}
public IStormClusterState getStormClusterState() {
return stormClusterState;
}
LocalState getLocalState() {
return localState;
}
public String getAssignmentId() {
return assignmentId;
}
public String getHostName() {
return hostName;
}
public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
return currAssignment;
}
public Localizer getLocalizer() {
return localizer;
}
ILocalizer getAsyncLocalizer() {
return asyncLocalizer;
}
EventManager getEventManger() {
return eventManager;
}
/**
* Launch the supervisor
*/
public void launch() throws Exception {
LOG.info("Starting Supervisor with conf {}", conf);
String path = ConfigUtils.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new File(path));
Localizer localizer = getLocalizer();
SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
hb.run();
// should synchronize supervisor so it doesn't launch anything after being down (optimization)
Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
this.eventManager = new EventManagerImp(false);
this.readState = new ReadClusterState(this);
Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
if (portToAssignments != null) {
Map<String, LocalAssignment> assignments = new HashMap<>();
for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
assignments.put(la.get_topology_id(), la);
}
for (String topoId : downloadedTopoIds) {
LocalAssignment la = assignments.get(topoId);
if (la != null) {
SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
} else {
LOG.warn("Could not find an owner for topo {}", topoId);
}
}
}
// do this after adding the references so we don't try to clean things being used
localizer.startCleaner();
UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
// to date even if callbacks don't all work exactly right
eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
// Blob update thread. Starts with 30 seconds delay, every 30 seconds
blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
// supervisor health check
eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
}
LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
}
/**
* start distribute supervisor
*/
public void launchDaemon() {
LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
try {
Map<String, Object> conf = getConf();
if (ConfigUtils.isLocalMode(conf)) {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
launch();
Utils.addShutdownHookWithForceKillIn1Sec(new Runnable(){
@Override
public void run() {
close();
}
});
registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
StormMetricsRegistry.startMetricsReporters(conf);
} catch (Exception e) {
LOG.error("Failed to start supervisor\n", e);
System.exit(1);
}
}
private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
return pids.size();
}
});
}
@Override
public void close() {
try {
LOG.info("Shutting down supervisor {}", getId());
this.active = false;
heartbeatTimer.close();
eventTimer.close();
blobUpdateTimer.close();
if (eventManager != null) {
eventManager.close();
}
if (readState != null) {
readState.close();
}
asyncLocalizer.shutdown();
localizer.shutdown();
getStormClusterState().disconnect();
} catch (Exception e) {
LOG.error("Error Shutting down", e);
}
}
void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException {
HashSet<Killable> containers = new HashSet<>();
for (String workerId : workerIds) {
try {
Killable k = launcher.recoverContainer(workerId, localState);
if (!k.areAllProcessesDead()) {
k.kill();
containers.add(k);
} else {
k.cleanUp();
}
} catch (Exception e) {
LOG.error("Error trying to kill {}", workerId, e);
}
}
int shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1);
if (!containers.isEmpty()) {
Time.sleepSecs(shutdownSleepSecs);
}
for (Killable k: containers) {
try {
k.forceKill();
long start = Time.currentTimeMillis();
while(!k.areAllProcessesDead()) {
if ((Time.currentTimeMillis() - start) > 10_000) {
throw new RuntimeException("Giving up on killing " + k
+ " after " + (Time.currentTimeMillis() - start) + " ms");
}
Time.sleep(100);
k.forceKill();
}
k.cleanUp();
} catch (Exception e) {
LOG.error("Error trying to clean up {}", k, e);
}
}
}
public void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot> onErrorTimeout) {
if (readState != null) {
readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
} else {
try {
ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());
killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
}
@Override
public boolean isWaiting() {
if (!active) {
return true;
}
if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && eventManager.waiting()) {
return true;
}
return false;
}
/**
* supervisor daemon enter entrance
*
* @param args
*/
public static void main(String[] args) throws Exception {
Utils.setupDefaultUncaughtExceptionHandler();
@SuppressWarnings("resource")
Supervisor instance = new Supervisor(new StandaloneSupervisor());
instance.launchDaemon();
}
}