blob: f2e401a6ba493ed251b317c0620d9bb487f7430a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKStore implements Store {
private final Configuration conf;
private final ZooKeeper zkClient;
private static final Log LOG = LogFactory.getLog(ZKStore.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final String NODES = "nodes/";
private static final String APPS = "apps/";
private static final String ZK_PATH_SEPARATOR = "/";
private static final String NODE_ID = "nodeid";
private static final String APP_MASTER = "master";
private static final String APP_MASTER_CONTAINER = "mastercontainer";
private final String ZK_ADDRESS;
private final int ZK_TIMEOUT;
private boolean doneWithRecovery = false;
/** TODO make this generic **/
private NodeIdPBImpl nodeId = new NodeIdPBImpl();
/**
* TODO fix this for later to handle all kinds of events
* of connection and session events.
*
*/
private static class ZKWatcher implements Watcher {
@Override
public void process(WatchedEvent arg0) {
}
}
public ZKStore(Configuration conf) throws IOException {
this.conf = conf;
this.ZK_ADDRESS = conf.get(YarnConfiguration.RM_ZK_STORE_ADDRESS);
this.ZK_TIMEOUT = conf.getInt(YarnConfiguration.RM_ZK_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_STORE_TIMEOUT_MS);
zkClient = new ZooKeeper(this.ZK_ADDRESS,
this.ZK_TIMEOUT,
createZKWatcher()
);
// TODO: FIXMEVinodkv
// this.nodeId.setId(0);
}
protected Watcher createZKWatcher() {
return new ZKWatcher();
}
private NodeReportPBImpl createNodeManagerInfo(RMNode rmNode) {
NodeReport node =
recordFactory.newRecordInstance(NodeReport.class);
node.setNodeId(rmNode.getNodeID());
node.setRackName(rmNode.getRackName());
node.setCapability(rmNode.getTotalCapability());
// TODO: FIXME
// node.setUsed(nodeInfo.getUsedResource());
// TODO: acm: refactor2 FIXME
// node.setNumContainers(rmNode.getNumContainers());
return (NodeReportPBImpl)node;
}
@Override
public synchronized void storeNode(RMNode node) throws IOException {
/** create a storage node and store it in zk **/
if (!doneWithRecovery) return;
// TODO: FIXMEVinodkv
// NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
// byte[] bytes = nodeManagerInfo.getProto().toByteArray();
// try {
// zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
// CreateMode.PERSISTENT);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException("Interrupted");
// } catch(KeeperException ke) {
// LOG.info("Keeper exception", ke);
// throw convertToIOException(ke);
// }
}
@Override
public synchronized void removeNode(RMNode node) throws IOException {
if (!doneWithRecovery) return;
// TODO: FIXME VINODKV
// /** remove a storage node **/
// try {
// zkClient.delete(NODES + Integer.toString(node.getNodeID().getId()), -1);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException("Interrupted");
// } catch(KeeperException ke) {
// LOG.info("Keeper exception", ke);
// throw convertToIOException(ke);
// }
}
private static IOException convertToIOException(KeeperException ke) {
IOException io = new IOException();
io.setStackTrace(ke.getStackTrace());
return io;
}
@Override
public synchronized NodeId getNextNodeId() throws IOException {
// TODO: FIXME VINODKV
// int num = nodeId.getId();
// num++;
// nodeId.setId(num);
// try {
// zkClient.setData(NODES + NODE_ID, nodeId.getProto().toByteArray() , -1);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException(ie.getMessage());
// } catch(KeeperException ke) {
// throw convertToIOException(ke);
// }
return nodeId;
}
private String containerPathFromContainerId(ContainerId containerId) {
String appString = ConverterUtils.toString(
containerId.getApplicationAttemptId().getApplicationId());
return appString + "/" + containerId.getId();
}
private class ZKApplicationStore implements ApplicationStore {
private final ApplicationId applicationId;
public ZKApplicationStore(ApplicationId applicationId) {
this.applicationId = applicationId;
}
@Override
public void storeMasterContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
try {
zkClient.setData(APPS +
ConverterUtils.toString(
container.getId().getApplicationAttemptId().getApplicationId())
+
ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER
, containerPBImpl.getProto().toByteArray(), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public synchronized void storeContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
try {
zkClient.create(APPS + containerPathFromContainerId(container.getId())
, containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public synchronized void removeContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
try {
zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
-1);
} catch(InterruptedException ie) {
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public void updateApplicationState(
ApplicationMaster master) throws IOException {
if (!doneWithRecovery) return;
String appString = APPS + ConverterUtils.toString(applicationId);
ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
try {
zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
}
@Override
public synchronized ApplicationStore createApplicationStore(ApplicationId application,
ApplicationSubmissionContext context) throws IOException {
if (!doneWithRecovery) return new ZKApplicationStore(application);
ApplicationSubmissionContextPBImpl contextPBImpl = (ApplicationSubmissionContextPBImpl) context;
String appString = APPS + ConverterUtils.toString(application);
ApplicationMasterPBImpl masterPBImpl = new ApplicationMasterPBImpl();
ContainerPBImpl container = new ContainerPBImpl();
try {
zkClient.create(appString, contextPBImpl.getProto()
.toByteArray(), null, CreateMode.PERSISTENT);
zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER,
masterPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER,
container.getProto().toByteArray(), null, CreateMode.PERSISTENT);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
return new ZKApplicationStore(application);
}
@Override
public synchronized void removeApplication(ApplicationId application) throws IOException {
if (!doneWithRecovery) return;
try {
zkClient.delete(APPS + ConverterUtils.toString(application), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper Exception", ke);
throw convertToIOException(ke);
}
}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
@Override
public void doneWithRecovery() {
this.doneWithRecovery = true;
}
@Override
public synchronized RMState restore() throws IOException {
ZKRMState rmState = new ZKRMState();
rmState.load();
return rmState;
}
private static class ApplicationInfoImpl implements ApplicationInfo {
private ApplicationMaster master;
private Container masterContainer;
private final ApplicationSubmissionContext context;
private final List<Container> containers = new ArrayList<Container>();
public ApplicationInfoImpl(ApplicationSubmissionContext context) {
this.context = context;
}
public void setApplicationMaster(ApplicationMaster master) {
this.master = master;
}
public void setMasterContainer(Container container) {
this.masterContainer = container;
}
@Override
public ApplicationMaster getApplicationMaster() {
return this.master;
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return this.context;
}
@Override
public Container getMasterContainer() {
return this.masterContainer;
}
@Override
public List<Container> getContainers() {
return this.containers;
}
public void addContainer(Container container) {
containers.add(container);
}
}
private class ZKRMState implements RMState {
private List<RMNode> nodeManagers = new ArrayList<RMNode>();
private Map<ApplicationId, ApplicationInfo> applications = new
HashMap<ApplicationId, ApplicationInfo>();
public ZKRMState() {
LOG.info("Restoring RM state from ZK");
}
private synchronized List<NodeReport> listStoredNodes() throws IOException {
/** get the list of nodes stored in zk **/
//TODO PB
List<NodeReport> nodes = new ArrayList<NodeReport>();
Stat stat = new Stat();
try {
List<String> children = zkClient.getChildren(NODES, false);
for (String child: children) {
byte[] data = zkClient.getData(NODES + child, false, stat);
NodeReportPBImpl nmImpl = new NodeReportPBImpl(
NodeReportProto.parseFrom(data));
nodes.add(nmImpl);
}
} catch (InterruptedException ie) {
LOG.info("Interrupted" , ie);
throw new InterruptedIOException("Interrupted");
} catch(KeeperException ke) {
LOG.error("Failed to list nodes", ke);
throw convertToIOException(ke);
}
return nodes;
}
@Override
public List<RMNode> getStoredNodeManagers() {
return nodeManagers;
}
@Override
public NodeId getLastLoggedNodeId() {
return nodeId;
}
private void readLastNodeId() throws IOException {
Stat stat = new Stat();
try {
byte[] data = zkClient.getData(NODES + NODE_ID, false, stat);
nodeId = new NodeIdPBImpl(NodeIdProto.parseFrom(data));
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper Exception", ke);
throw convertToIOException(ke);
}
}
private ApplicationInfo getAppInfo(String app) throws IOException {
ApplicationInfoImpl info = null;
Stat stat = new Stat();
try {
ApplicationSubmissionContext context = null;
byte[] data = zkClient.getData(APPS + app, false, stat);
context = new ApplicationSubmissionContextPBImpl(
ApplicationSubmissionContextProto.parseFrom(data));
info = new ApplicationInfoImpl(context);
List<String> children = zkClient.getChildren(APPS + app, false, stat);
ApplicationMaster master = null;
for (String child: children) {
byte[] childdata = zkClient.getData(APPS + app + ZK_PATH_SEPARATOR + child, false, stat);
if (APP_MASTER.equals(child)) {
master = new ApplicationMasterPBImpl(ApplicationMasterProto.parseFrom(childdata));
info.setApplicationMaster(master);
} else if (APP_MASTER_CONTAINER.equals(child)) {
Container masterContainer = new ContainerPBImpl(ContainerProto.parseFrom(data));
info.setMasterContainer(masterContainer);
} else {
Container container = new ContainerPBImpl(ContainerProto.parseFrom(data));
info.addContainer(container);
}
}
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
throw convertToIOException(ke);
}
return info;
}
private void load() throws IOException {
List<NodeReport> nodeInfos = listStoredNodes();
final Pattern trackerPattern = Pattern.compile(".*:.*");
final Matcher m = trackerPattern.matcher("");
for (NodeReport node: nodeInfos) {
m.reset(node.getNodeId().getHost());
if (!m.find()) {
LOG.info("Skipping node, bad node-address "
+ node.getNodeId().getHost());
continue;
}
String hostName = m.group(0);
int cmPort = Integer.valueOf(m.group(1));
m.reset(node.getHttpAddress());
if (!m.find()) {
LOG.info("Skipping node, bad http-address " + node.getHttpAddress());
continue;
}
int httpPort = Integer.valueOf(m.group(1));
// TODO: FindBugs warns passing null below. Commenting this for later.
// RMNode nm = new RMNodeImpl(node.getNodeId(), null,
// hostName, cmPort, httpPort,
// ResourceTrackerService.resolve(node.getNodeId().getHost()),
// node.getCapability());
// nodeManagers.add(nm);
}
readLastNodeId();
/* make sure we get all the applications */
List<String> apps = null;
try {
apps = zkClient.getChildren(APPS, false);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
throw convertToIOException(ke);
}
for (String app: apps) {
ApplicationInfo info = getAppInfo(app);
applications.put(info.getApplicationMaster().getApplicationId(), info);
}
}
@Override
public Map<ApplicationId, ApplicationInfo> getStoredApplications() {
return applications;
}
}
}