blob: 27e42b892e8fb7ace148ae835a9868c8de045ddd [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler.local;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.UpdateTopologyManager;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.scheduler.IScheduler;
import com.twitter.heron.spi.utils.ShellUtils;
public class LocalScheduler implements IScheduler, IScalable {
private static final Logger LOG = Logger.getLogger(LocalScheduler.class.getName());
// executor service for monitoring all the containers
private final ExecutorService monitorService = Executors.newCachedThreadPool();
// map to keep track of the process and the shard it is running
private final Map<Process, Integer> processToContainer = new ConcurrentHashMap<>();
private Config config;
private Config runtime;
private UpdateTopologyManager updateTopologyManager;
// has the topology been killed?
private volatile boolean isTopologyKilled = false;
@Override
public void initialize(Config mConfig, Config mRuntime) {
this.config = mConfig;
this.runtime = mRuntime;
this.updateTopologyManager =
new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
}
@Override
public void close() {
// Shut down the ExecutorService for monitoring
monitorService.shutdownNow();
// Clear the map
processToContainer.clear();
if (updateTopologyManager != null) {
updateTopologyManager.close();
}
}
/**
* Start executor process via running an async shell process
*/
@VisibleForTesting
protected Process startExecutorProcess(int container, Set<PackingPlan.InstancePlan> instances) {
return ShellUtils.runASyncProcess(
getExecutorCommand(container, instances),
new File(LocalContext.workingDirectory(config)),
Integer.toString(container));
}
/**
* Start the executor for the given container
*/
@VisibleForTesting
protected void startExecutor(final int container, Set<PackingPlan.InstancePlan> instances) {
LOG.info("Starting a new executor for container: " + container);
// create a process with the executor command and topology working directory
final Process containerExecutor = startExecutorProcess(container, instances);
// associate the process and its container id
processToContainer.put(containerExecutor, container);
LOG.info("Started the executor for container: " + container);
// add the container for monitoring
startExecutorMonitor(container, containerExecutor, instances);
}
/**
* Start the monitor of a given executor
*/
@VisibleForTesting
protected void startExecutorMonitor(final int container,
final Process containerExecutor,
Set<PackingPlan.InstancePlan> instances) {
// add the container for monitoring
Runnable r = new Runnable() {
@Override
public void run() {
try {
LOG.info("Waiting for container " + container + " to finish.");
containerExecutor.waitFor();
LOG.log(Level.INFO,
"Container {0} is completed. Exit status: {1}",
new Object[]{container, containerExecutor.exitValue()});
if (isTopologyKilled) {
LOG.info("Topology is killed. Not to start new executors.");
return;
} else if (!processToContainer.containsKey(containerExecutor)) {
LOG.log(Level.INFO, "Container {0} is killed. No need to relaunch.", container);
return;
}
LOG.log(Level.INFO, "Trying to restart container {0}", container);
// restart the container
startExecutor(processToContainer.remove(containerExecutor), instances);
} catch (InterruptedException e) {
if (!isTopologyKilled) {
LOG.log(Level.SEVERE, "Process is interrupted: ", e);
}
}
}
};
monitorService.submit(r);
}
private String[] getExecutorCommand(int container, Set<PackingPlan.InstancePlan> instances) {
Map<ExecutorPort, String> ports = new HashMap<>();
for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
int port = SysUtils.getFreePort();
if (port == -1) {
throw new RuntimeException("Failed to find available ports for executor");
}
ports.put(executorPort, String.valueOf(port));
}
if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtime))
&& instances != null) {
List<String> remoteDebuggingPorts = new LinkedList<>();
int portsForRemoteDebugging = instances.size();
for (int i = 0; i < portsForRemoteDebugging; i++) {
int port = SysUtils.getFreePort();
if (port == -1) {
throw new RuntimeException("Failed to find available ports for executor");
}
remoteDebuggingPorts.add(String.valueOf(port));
}
ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
String.join(",", remoteDebuggingPorts));
}
String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
LOG.info("Executor command line: " + Arrays.toString(executorCmd));
return executorCmd;
}
/**
* Schedule the provided packed plan
*/
@Override
public boolean onSchedule(PackingPlan packing) {
LOG.info("Starting to deploy topology: " + LocalContext.topologyName(config));
synchronized (processToContainer) {
LOG.info("Starting executor for TMaster");
startExecutor(0, null);
// for each container, run its own executor
for (PackingPlan.ContainerPlan container : packing.getContainers()) {
startExecutor(container.getId(), container.getInstances());
}
}
LOG.info("Executor for each container have been started.");
return true;
}
@Override
public List<String> getJobLinks() {
return new ArrayList<>();
}
/**
* Handler to kill topology
*/
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
// get the topology name
String topologyName = LocalContext.topologyName(config);
LOG.info("Command to kill topology: " + topologyName);
// set the flag that the topology being killed
isTopologyKilled = true;
synchronized (processToContainer) {
// destroy/kill the process for each container
for (Process p : processToContainer.keySet()) {
// get the container index for the process
int index = processToContainer.get(p);
LOG.info("Killing executor for container: " + index);
// destroy the process
p.destroy();
LOG.info("Killed executor for container: " + index);
}
// clear the mapping between process and container ids
processToContainer.clear();
}
return true;
}
/**
* Handler to restart topology
*/
@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
// Containers would be restarted automatically once we destroy it
int containerId = request.getContainerIndex();
List<Process> processesToRestart = new LinkedList<>();
if (containerId == -1) {
LOG.info("Command to restart the entire topology: " + LocalContext.topologyName(config));
processesToRestart.addAll(processToContainer.keySet());
} else {
// restart that particular container
LOG.info("Command to restart a container of topology: " + LocalContext.topologyName(config));
LOG.info("Restart container requested: " + containerId);
// locate the container and destroy it
for (Process p : processToContainer.keySet()) {
if (containerId == processToContainer.get(p)) {
processesToRestart.add(p);
}
}
}
if (processesToRestart.isEmpty()) {
LOG.severe("Container not exist.");
return false;
}
for (Process process : processesToRestart) {
process.destroy();
}
return true;
}
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
@Override
public Set<PackingPlan.ContainerPlan> addContainers(Set<PackingPlan.ContainerPlan> containers) {
synchronized (processToContainer) {
for (PackingPlan.ContainerPlan container : containers) {
if (processToContainer.values().contains(container.getId())) {
throw new RuntimeException(String.format("Found active container for %s, "
+ "cannot launch a duplicate container.", container.getId()));
}
startExecutor(container.getId(), container.getInstances());
}
}
return containers;
}
@Override
public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
LOG.log(Level.INFO,
"Kill {0} of {1} containers",
new Object[]{containersToRemove.size(), processToContainer.size()});
synchronized (processToContainer) {
// Create a inverse map to be able to get process instance from container id
Map<Integer, Process> containerToProcessMap = new HashMap<>();
for (Map.Entry<Process, Integer> entry : processToContainer.entrySet()) {
containerToProcessMap.put(entry.getValue(), entry.getKey());
}
for (PackingPlan.ContainerPlan containerToRemove : containersToRemove) {
int containerId = containerToRemove.getId();
Process process = containerToProcessMap.get(containerId);
if (process == null) {
LOG.log(Level.WARNING, "Container for id:{0} not found.", containerId);
continue;
}
// remove the process so that it is not monitored and relaunched
LOG.info("Killing executor for container: " + containerId);
processToContainer.remove(process);
process.destroy();
LOG.info("Killed executor for container: " + containerId);
}
}
}
@VisibleForTesting
boolean isTopologyKilled() {
return isTopologyKilled;
}
@VisibleForTesting
ExecutorService getMonitorService() {
return monitorService;
}
@VisibleForTesting
Map<Process, Integer> getProcessToContainer() {
return processToContainer;
}
}