blob: e41151758d79fb710345bb2f65ae90133506521c [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import jol.core.Runtime;
import jol.core.Runtime.DebugLevel;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.json.JSONArray;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.control.CCConfig;
import edu.uci.ics.hyracks.api.control.IClusterController;
import edu.uci.ics.hyracks.api.control.INodeController;
import edu.uci.ics.hyracks.api.control.NodeParameters;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.web.handlers.util.IJSONOutputFunction;
import edu.uci.ics.hyracks.control.cc.web.handlers.util.JSONOutputRequestHandler;
import edu.uci.ics.hyracks.control.cc.web.handlers.util.RoutingHandler;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
IHyracksClientInterface {
private static final long serialVersionUID = 1L;
private CCConfig ccConfig;
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
private final Map<String, NodeControllerState> nodeRegistry;
private final Map<String, ApplicationContext> applications;
private final ServerContext serverCtx;
private WebServer webServer;
private ClusterControllerInfo info;
private final IJobManager jobManager;
private final Executor taskExecutor;
private final Timer timer;
private Runtime jolRuntime;
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
applications = new Hashtable<String, ApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
ClusterControllerService.class.getName()));
Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
jobManager = new JOLJobManagerImpl(this, jolRuntime);
taskExecutor = Executors.newCachedThreadPool();
webServer = new WebServer();
webServer.addHandler(getAdminConsoleHandler());
webServer.addHandler(getApplicationInstallationHandler());
webServer.addHandler(getRestAPIHandler());
this.timer = new Timer(true);
}
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService");
Registry registry = LocateRegistry.createRegistry(ccConfig.port);
registry.rebind(IHyracksClientInterface.class.getName(), this);
registry.rebind(IClusterController.class.getName(), this);
webServer.setPort(ccConfig.httpPort);
webServer.start();
info = new ClusterControllerInfo();
info.setWebPort(webServer.getListeningPort());
timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
webServer.stop();
LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
}
@Override
public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
ApplicationContext appCtx = applications.get(appName);
if (appCtx == null) {
throw new HyracksException("No application with id " + appName + " found");
}
return jobManager.createJob(appName, (JobSpecification) appCtx.deserialize(jobSpec), jobFlags);
}
@Override
public NodeParameters registerNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
NodeControllerState state = new NodeControllerState(nodeController);
synchronized (this) {
if (nodeRegistry.containsKey(id)) {
throw new Exception("Node with this name already registered.");
}
nodeRegistry.put(id, state);
}
nodeController.notifyRegistration(this);
jobManager.registerNode(id);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(info);
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
return params;
}
@Override
public void unregisterNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
synchronized (this) {
nodeRegistry.remove(id);
}
LOGGER.log(Level.INFO, "Unregistered INodeController");
}
public synchronized NodeControllerState lookupNode(String id) throws Exception {
return nodeRegistry.get(id);
}
public Executor getExecutor() {
return taskExecutor;
}
public synchronized void notifyJobComplete(final UUID jobId) {
for (final NodeControllerState ns : nodeRegistry.values()) {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
ns.getNodeController().cleanUpJob(jobId);
} catch (Exception e) {
}
}
});
}
}
@Override
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
Map<String, Long> statistics) throws Exception {
jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
}
@Override
public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
}
@Override
public JobStatus getJobStatus(UUID jobId) throws Exception {
return jobManager.getJobStatus(jobId);
}
@Override
public void start(UUID jobId) throws Exception {
jobManager.start(jobId);
}
@Override
public void waitForCompletion(UUID jobId) throws Exception {
jobManager.waitForCompletion(jobId);
}
@Override
public void reportProfile(String id, Map<UUID, Map<String, Long>> counterDump) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Profile: " + id + ": " + counterDump);
}
jobManager.reportProfile(id, counterDump);
}
private Handler getRestAPIHandler() {
ContextHandler handler = new ContextHandler("/state");
RoutingHandler rh = new RoutingHandler();
rh.addHandler("jobs", new JSONOutputRequestHandler(new IJSONOutputFunction() {
@Override
public JSONObject invoke(String[] arguments) throws Exception {
JSONObject result = new JSONObject();
switch (arguments.length) {
case 1:
if (!"".equals(arguments[0])) {
break;
}
case 0:
result.put("result", jobManager.getQueryInterface().getAllJobSummaries());
break;
case 2:
UUID jobId = UUID.fromString(arguments[0]);
if ("spec".equalsIgnoreCase(arguments[1])) {
result.put("result", jobManager.getQueryInterface().getJobSpecification(jobId));
} else if ("profile".equalsIgnoreCase(arguments[1])) {
result.put("result", jobManager.getQueryInterface().getJobProfile(jobId));
}
}
return result;
}
}));
handler.setHandler(rh);
return handler;
}
private Handler getAdminConsoleHandler() {
ContextHandler handler = new ContextHandler("/admin");
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
if (!"/".equals(target)) {
return;
}
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
PrintWriter writer = response.getWriter();
writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
writer.println("<h1>Hyracks Admin Console</h1>");
writer.println("<h2>Node Controllers</h2>");
writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
synchronized (ClusterControllerService.this) {
for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
try {
writer.print("<tr><td>");
writer.print(e.getKey());
writer.print("</td><td>");
writer.print(e.getValue().getLastHeartbeatDuration());
writer.print("</td></tr>");
} catch (Exception ex) {
}
}
}
writer.println("</table>");
writer.println("</body></html>");
writer.flush();
}
});
return handler;
}
private Handler getApplicationInstallationHandler() {
ContextHandler handler = new ContextHandler("/applications");
handler.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
try {
while (target.startsWith("/")) {
target = target.substring(1);
}
while (target.endsWith("/")) {
target = target.substring(0, target.length() - 1);
}
String[] parts = target.split("/");
if (parts.length != 1) {
return;
}
String appName = parts[0];
ApplicationContext appCtx;
appCtx = applications.get(appName);
if (appCtx != null) {
if (HttpMethods.PUT.equals(request.getMethod())) {
OutputStream os = appCtx.getHarOutputStream();
try {
IOUtils.copyLarge(request.getInputStream(), os);
} finally {
os.close();
}
} else if (HttpMethods.GET.equals(request.getMethod())) {
if (!appCtx.containsHar()) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
} else {
InputStream is = appCtx.getHarInputStream();
response.setContentType("application/octet-stream");
response.setStatus(HttpServletResponse.SC_OK);
try {
IOUtils.copyLarge(is, response.getOutputStream());
} finally {
is.close();
}
}
}
baseRequest.setHandled(true);
}
} catch (IOException e) {
e.printStackTrace();
throw e;
}
}
});
return handler;
}
@Override
public synchronized void nodeHeartbeat(String id) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Heartbeat from: " + id);
}
NodeControllerState ns = nodeRegistry.get(id);
if (ns != null) {
ns.notifyHeartbeat();
}
}
private void killNode(String nodeId) throws Exception {
nodeRegistry.remove(nodeId);
jobManager.notifyNodeFailure(nodeId);
}
@Override
public void createApplication(String appName) throws Exception {
synchronized (applications) {
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
ApplicationContext appCtx = new ApplicationContext(serverCtx, appName);
applications.put(appName, appCtx);
}
}
@Override
public void destroyApplication(String appName) throws Exception {
ApplicationContext appCtx = applications.remove(appName);
if (appCtx != null) {
synchronized (this) {
for (NodeControllerState ncs : nodeRegistry.values()) {
ncs.getNodeController().destroyApplication(appName);
}
}
appCtx.deinitialize();
}
}
@Override
public void startApplication(final String appName) throws Exception {
ApplicationContext appCtx = applications.get(appName);
appCtx.initialize();
final boolean deployHar = appCtx.containsHar();
RemoteOp<Void>[] ops;
synchronized (this) {
List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
for (final String nodeId : nodeRegistry.keySet()) {
opList.add(new RemoteOp<Void>() {
@Override
public String getNodeId() {
return nodeId;
}
@Override
public Void execute(INodeController node) throws Exception {
node.createApplication(appName, deployHar);
return null;
}
});
}
ops = opList.toArray(new RemoteOp[opList.size()]);
}
runRemote(ops, null);
}
@Override
public ClusterControllerInfo getClusterControllerInfo() throws Exception {
return info;
}
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
Set<String> deadNodes = new HashSet<String>();
synchronized (ClusterControllerService.this) {
for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
NodeControllerState state = e.getValue();
if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
deadNodes.add(e.getKey());
}
}
for (String deadNode : deadNodes) {
try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Killing node: " + deadNode);
}
killNode(deadNode);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
interface RemoteOp<T> {
public String getNodeId();
public T execute(INodeController node) throws Exception;
}
interface Accumulator<T, R> {
public void accumulate(T o);
public R getResult();
}
<T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
final Semaphore installComplete = new Semaphore(remoteOps.length);
final List<Exception> errors = new Vector<Exception>();
for (final RemoteOp<T> remoteOp : remoteOps) {
NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
final INodeController node = nodeState.getNodeController();
installComplete.acquire();
Runnable remoteRunner = new Runnable() {
@Override
public void run() {
try {
T t = remoteOp.execute(node);
if (accumulator != null) {
synchronized (accumulator) {
accumulator.accumulate(t);
}
}
} catch (Exception e) {
errors.add(e);
} finally {
installComplete.release();
}
}
};
getExecutor().execute(remoteRunner);
}
installComplete.acquire(remoteOps.length);
if (!errors.isEmpty()) {
throw errors.get(0);
}
return accumulator == null ? null : accumulator.getResult();
}
private static byte[] serialize(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
return baos.toByteArray();
}
static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
private String nodeId;
private UUID jobId;
private String appName;
private JobPlan plan;
private UUID stageId;
private int attempt;
private Map<ActivityNodeId, Set<Integer>> tasks;
private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
public Phase1Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId, int attempt,
Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
this.nodeId = nodeId;
this.jobId = jobId;
this.appName = appName;
this.plan = plan;
this.stageId = stageId;
this.attempt = attempt;
this.tasks = tasks;
this.opPartitions = opPartitions;
}
@Override
public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
return node.initializeJobletPhase1(appName, jobId, serialize(plan), stageId, attempt, tasks, opPartitions);
}
@Override
public String toString() {
return jobId + " Distribution Phase 1";
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class Phase2Installer implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
private String appName;
private JobPlan plan;
private UUID stageId;
private Map<ActivityNodeId, Set<Integer>> tasks;
private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
private Map<PortInstanceId, Endpoint> globalPortMap;
public Phase2Installer(String nodeId, UUID jobId, String appName, JobPlan plan, UUID stageId,
Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
Map<PortInstanceId, Endpoint> globalPortMap) {
this.nodeId = nodeId;
this.jobId = jobId;
this.appName = appName;
this.plan = plan;
this.stageId = stageId;
this.tasks = tasks;
this.opPartitions = opPartitions;
this.globalPortMap = globalPortMap;
}
@Override
public Void execute(INodeController node) throws Exception {
node.initializeJobletPhase2(appName, jobId, serialize(plan), stageId, tasks, opPartitions, globalPortMap);
return null;
}
@Override
public String toString() {
return jobId + " Distribution Phase 2";
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class Phase3Installer implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
private UUID stageId;
public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
this.nodeId = nodeId;
this.jobId = jobId;
this.stageId = stageId;
}
@Override
public Void execute(INodeController node) throws Exception {
node.commitJobletInitialization(jobId, stageId);
return null;
}
@Override
public String toString() {
return jobId + " Distribution Phase 3";
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class StageStarter implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
private UUID stageId;
public StageStarter(String nodeId, UUID jobId, UUID stageId) {
this.nodeId = nodeId;
this.jobId = jobId;
this.stageId = stageId;
}
@Override
public Void execute(INodeController node) throws Exception {
node.startStage(jobId, stageId);
return null;
}
@Override
public String toString() {
return jobId + " Started Stage: " + stageId;
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class JobletAborter implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
private UUID stageId;
public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
this.nodeId = nodeId;
this.jobId = jobId;
this.stageId = stageId;
}
@Override
public Void execute(INodeController node) throws Exception {
node.abortJoblet(jobId, stageId);
return null;
}
@Override
public String toString() {
return jobId + " Aborting";
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class JobCompleteNotifier implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
public JobCompleteNotifier(String nodeId, UUID jobId) {
this.nodeId = nodeId;
this.jobId = jobId;
}
@Override
public Void execute(INodeController node) throws Exception {
node.cleanUpJob(jobId);
return null;
}
@Override
public String toString() {
return jobId + " Cleaning Up";
}
@Override
public String getNodeId() {
return nodeId;
}
}
static class PortMapMergingAccumulator implements
Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@Override
public void accumulate(Map<PortInstanceId, Endpoint> o) {
portMap.putAll(o);
}
@Override
public Map<PortInstanceId, Endpoint> getResult() {
return portMap;
}
}
}