blob: 83d97e05b829e6e549437a2333a52fce900af6a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
public class MiniYARNCluster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);
// temp fix until metrics system can auto-detect itself running in unit test:
static {
DefaultMetricsSystem.setMiniClusterMode(true);
}
private NodeManager[] nodeManagers;
private ResourceManager resourceManager;
private ResourceManagerWrapper resourceManagerWrapper;
private File testWorkDir;
// Number of nm-local-dirs per nodemanager
private int numLocalDirs;
// Number of nm-log-dirs per nodemanager
private int numLogDirs;
/**
* @param testName name of the test
* @param noOfNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
*/
public MiniYARNCluster(String testName, int noOfNodeManagers,
int numLocalDirs, int numLogDirs) {
super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
this.testWorkDir = new File("target",
testName.replace("$", ""));
try {
FileContext.getLocalFSFileContext().delete(
new Path(testWorkDir.getAbsolutePath()), true);
} catch (Exception e) {
LOG.warn("COULD NOT CLEANUP", e);
throw new YarnException("could not cleanup test dir", e);
}
resourceManagerWrapper = new ResourceManagerWrapper();
addService(resourceManagerWrapper);
nodeManagers = new CustomNodeManager[noOfNodeManagers];
for(int index = 0; index < noOfNodeManagers; index++) {
addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager();
}
}
@Override
public void init(Configuration conf) {
super.init(conf instanceof YarnConfiguration ? conf
: new YarnConfiguration(conf));
}
public File getTestWorkDir() {
return testWorkDir;
}
public ResourceManager getResourceManager() {
return this.resourceManager;
}
public NodeManager getNodeManager(int i) {
return this.nodeManagers[i];
}
public static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
private class ResourceManagerWrapper extends AbstractService {
public ResourceManagerWrapper() {
super(ResourceManagerWrapper.class.getName());
}
@Override
public synchronized void start() {
try {
getConfig().setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
if (!getConfig().getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
// pick free random ports.
getConfig().set(YarnConfiguration.RM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_ADMIN_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
}
Store store = StoreFactory.getStore(getConfig());
resourceManager = new ResourceManager(store) {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
};
};
resourceManager.init(getConfig());
new Thread() {
public void run() {
resourceManager.start();
};
}.start();
int waitCount = 0;
while (resourceManager.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
if (resourceManager.getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
"ResourceManager failed to start. Final state is "
+ resourceManager.getServiceState());
}
super.start();
} catch (Throwable t) {
throw new YarnException(t);
}
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
}
@Override
public synchronized void stop() {
if (resourceManager != null) {
resourceManager.stop();
}
super.stop();
}
}
private class NodeManagerWrapper extends AbstractService {
int index = 0;
public NodeManagerWrapper(int i) {
super(NodeManagerWrapper.class.getName() + "_" + i);
index = i;
}
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
super.init(config);
}
/**
* Create local/log directories
* @param dirType type of directories i.e. local dirs or log dirs
* @param numDirs number of directories
* @return the created directories as a comma delimited String
*/
private String prepareDirs(String dirType, int numDirs) {
File []dirs = new File[numDirs];
String dirsString = "";
for (int i = 0; i < numDirs; i++) {
dirs[i]= new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-" + dirType + "Dir-nm-" + index + "_" + i);
dirs[i].mkdir();
LOG.info("Created " + dirType + "Dir in " + dirs[i].getAbsolutePath());
String delimiter = (i > 0) ? "," : "";
dirsString = dirsString.concat(delimiter + dirs[i].getAbsolutePath());
}
return dirsString;
}
public synchronized void start() {
try {
// create nm-local-dirs and configure them for the nodemanager
String localDirsString = prepareDirs("local", numLocalDirs);
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDirsString);
// create nm-log-dirs and configure them for the nodemanager
String logDirsString = prepareDirs("log", numLogDirs);
getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDirsString);
File remoteLogDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-remoteLogDir-nm-" + index);
remoteLogDir.mkdir();
getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogDir.getAbsolutePath());
// By default AM + 2 containers
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
getConfig().set(YarnConfiguration.NM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
LOG.info("Starting NM: " + index);
nodeManagers[index].init(getConfig());
new Thread() {
public void run() {
nodeManagers[index].start();
};
}.start();
int waitCount = 0;
while (nodeManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for NM " + index + " to start...");
Thread.sleep(1000);
}
if (nodeManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("NodeManager " + index + " failed to start");
}
super.start();
} catch (Throwable t) {
throw new YarnException(t);
}
}
@Override
public synchronized void stop() {
if (nodeManagers[index] != null) {
nodeManagers[index].stop();
}
super.stop();
}
}
private class CustomNodeManager extends NodeManager {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
};
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
.getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// For in-process communication without RPC
return new ResourceTracker() {
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class);
try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
.getHeartbeatResponse());
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory.
newRecordInstance(RegisterNodeManagerResponse.class);
try {
response.setRegistrationResponse(rt
.registerNodeManager(request)
.getRegistrationResponse());
} catch (IOException ioe) {
LOG.info("Exception in node registration from "
+ request.getNodeId().toString(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
};
};
};
};
}
}