blob: d65b096bbe8c5c24d1cfad4150a49eff20f1ba3d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNodeStatusUpdater {
// temp fix until metrics system can auto-detect itself running in unit test:
static {
DefaultMetricsSystem.setMiniClusterMode(true);
}
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
static final Path basedir =
new Path("target", TestNodeStatusUpdater.class.getName());
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
int heartBeatID = 0;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = createNMConfig();
private NodeManager nm;
protected NodeManager rebootedNodeManager;
@After
public void tearDown() {
this.registeredNodes.clear();
heartBeatID = 0;
if (nm != null && nm.getServiceState() == STATE.STARTED) {
nm.stop();
}
DefaultMetricsSystem.shutdown();
}
private class MyResourceTracker implements ResourceTracker {
private final Context context;
public MyResourceTracker(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
NodeId nodeId = request.getNodeId();
Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString());
// NOTE: this really should be checking against the config value
InetSocketAddress expected = NetUtils.getConnectAddress(
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
registeredNodes.add(nodeId);
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setRegistrationResponse(regResponse);
return response;
}
ApplicationId applicationID = recordFactory
.newRecordInstance(ApplicationId.class);
ApplicationAttemptId appAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
ContainerId firstContainerID = recordFactory
.newRecordInstance(ContainerId.class);
ContainerId secondContainerID = recordFactory
.newRecordInstance(ContainerId.class);
private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
List<ContainerStatus> containers) {
Map<ApplicationId, List<ContainerStatus>> map =
new HashMap<ApplicationId, List<ContainerStatus>>();
for (ContainerStatus cs : containers) {
ApplicationId applicationId =
cs.getContainerId().getApplicationAttemptId().getApplicationId();
List<ContainerStatus> appContainers = map.get(applicationId);
if (appContainers == null) {
appContainers = new ArrayList<ContainerStatus>();
map.put(applicationId, appContainers);
}
appContainers.add(cs);
}
return map;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
applicationID.setId(heartBeatID);
appAttemptID.setApplicationId(applicationID);
firstContainerID.setApplicationAttemptId(appAttemptID);
firstContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
Container container = new ContainerImpl(conf , null, launchContext, null, null);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getContainersStatuses().size());
Assert.assertEquals("Number of container for the app should be one!",
1, appToContainers.get(applicationID).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
// Give another container to the NM.
applicationID.setId(heartBeatID);
appAttemptID.setApplicationId(applicationID);
secondContainerID.setApplicationAttemptId(appAttemptID);
secondContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
Container container = new ContainerImpl(conf, null, launchContext, null, null);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
appToContainers.size());
Assert.assertEquals("Number of container for the app should be two!",
2, appToContainers.get(applicationID).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(2, activeContainers.size());
}
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
this.resourceTracker = new MyResourceTracker3(this.context);
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return true;
}
}
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
this.nodeStatusUpdater =
new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics);
return this.nodeStatusUpdater;
}
protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
regResponse.setNodeAction(registerNodeAction );
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
private class MyResourceTracker3 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
private Map<ApplicationId, List<Long>> keepAliveRequests =
new HashMap<ApplicationId, List<Long>>();
private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
private final Context context;
MyResourceTracker3(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse =
recordFactory.newRecordInstance(RegistrationResponse.class);
regResponse.setNodeAction(registerNodeAction);
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response =
recordFactory.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) {
for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
List<Long> list = keepAliveRequests.get(appId);
if (list == null) {
list = new LinkedList<Long>();
keepAliveRequests.put(appId, list);
}
list.add(System.currentTimeMillis());
}
}
if (heartBeatID == 2) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class));
response.addAllApplicationsToCleanup(Collections.singletonList(appId));
}
NodeHeartbeatResponse nhResponse =
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
@Before
public void clearError() {
nmStartError = null;
}
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(basedir, true);
}
@Test
public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
metrics);
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
// verify that the last service is the nodeStatusUpdater (ie registration
// with RM)
Object[] services = nm.getServices().toArray();
Object lastService = services[services.length-1];
Assert.assertTrue("last service is NOT the node status updater",
lastService instanceof NodeStatusUpdater);
new Thread() {
public void run() {
try {
nm.start();
} catch (Throwable e) {
TestNodeStatusUpdater.this.nmStartError = e;
throw new YarnException(e);
}
}
}.start();
System.out.println(" ----- thread already started.."
+ nm.getServiceState());
int waitCount = 0;
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
if (nmStartError != null) {
LOG.error("Error during startup. ", nmStartError);
Assert.fail(nmStartError.getCause().getMessage());
}
Thread.sleep(1000);
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");
}
waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop();
}
@Test
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
@Override
protected void cleanupContainers() {
super.cleanupContainers();
numCleanups.incrementAndGet();
}
};
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// Meanwhile call stop directly as the shutdown hook would
nm.stop();
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
Assert.assertEquals(numCleanups.get(), 1);
}
@Test
public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
}
@Test
public void testNodeReboot() throws Exception {
nm = getNodeManager(NodeAction.REBOOT);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
waitCount = 0;
while (null == rebootedNodeManager && waitCount++ != 20) {
LOG.info("Waiting for NM to reinitialize..");
Thread.sleep(1000);
}
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
rebootedNodeManager.stop();
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
}
@Test
public void testNMShutdownForRegistrationFailure() {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
nodeStatusUpdater.resourceTracker = myResourceTracker2;
return nodeStatusUpdater;
}
};
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this
* because otherwise, NM will report to RM even if all its servers are not
* started properly, RM will think that the NM is alive and will retire the NM
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
*/
@Test
public void testNoRegistrationWhenNMServicesFail() {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
metrics);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, diskhandler) {
@Override
public void start() {
// Simulating failure of starting RPC server
throw new YarnException("Starting of RPC Server failed");
}
};
}
};
verifyNodeStartFailure("Starting of RPC Server failed");
}
@Test
public void testApplicationKeepAlive() throws Exception {
MyNodeManager nm = new MyNodeManager();
try {
YarnConfiguration conf = createNMConfig();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
4000l);
nm.init(conf);
nm.start();
// HB 2 -> app cancelled by RM.
while (heartBeatID < 12) {
Thread.sleep(1000l);
}
MyResourceTracker3 rt =
(MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
rt.context.getApplications().remove(rt.appId);
Assert.assertEquals(1, rt.keepAliveRequests.size());
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
while (heartBeatID < 20) {
Thread.sleep(1000l);
}
int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
} finally {
if (nm.getServiceState() == STATE.STARTED)
nm.stop();
}
}
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
Assert.assertEquals(errMessage, e.getCause()
.getMessage());
}
// the state change to stopped occurs only if the startup is success, else
// state change doesn't occur
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
.getServiceState());
Assert.assertEquals("Number of registered nodes is wrong!", 0,
this.registeredNodes.size());
}
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
.getPath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
"remotelogs").toUri().getPath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
.toUri().getPath());
return conf;
}
private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
return new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
@Override
NodeManager createNewNodeManager() {
rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
return rebootedNodeManager;
}
};
}
}