blob: d208b2fb516c4888ae5dcafa4972b322db6ec7ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNodeStatusUpdater {
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
static final Path basedir =
new Path("target", TestNodeStatusUpdater.class.getName());
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
int heartBeatID = 0;
volatile Error nmStartError = null;
private class MyResourceTracker implements ResourceTracker {
private Context context;
public MyResourceTracker(Context context) {
this.context = context;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
NodeId nodeId = request.getNodeId();
Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString());
try {
Assert.assertEquals(InetAddress.getLocalHost().getHostAddress()
+ ":12345", nodeId.toString());
} catch (UnknownHostException e) {
Assert.fail(e.getMessage());
}
Assert.assertEquals(5 * 1024, resource.getMemory());
RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
response.setRegistrationResponse(regResponse);
return response;
}
ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class);
ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class);
ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class);
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++);
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getAllContainers().size());
// Give a container to the NM.
applicationID.setId(heartBeatID);
firstContainerID.setAppId(applicationID);
firstContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(firstContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(2);
Container container = new ContainerImpl(null, launchContext, null, null);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getAllContainers().size());
Assert.assertEquals("Number of container for the app should be one!",
1, nodeStatus.getContainers(applicationID).size());
Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
.getResource().getMemory());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
// Give another container to the NM.
applicationID.setId(heartBeatID);
secondContainerID.setAppId(applicationID);
secondContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
launchContext.setContainerId(secondContainerID);
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
launchContext.getResource().setMemory(3);
Container container = new ContainerImpl(null, launchContext, null, null);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getAllContainers().size());
Assert.assertEquals("Number of container for the app should be two!",
2, nodeStatus.getContainers(applicationID).size());
Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
.getResource().getMemory());
Assert.assertEquals(3, nodeStatus.getContainers(applicationID).get(1)
.getResource().getMemory());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(2, activeContainers.size());
}
HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
}
@Override
protected ResourceTracker getRMClient() {
return new MyResourceTracker(this.context);
}
}
@Before
public void clearError() {
nmStartError = null;
}
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(basedir, true);
}
@Test
public void testNMRegistration() throws InterruptedException {
final NodeManager nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
metrics);
}
};
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(NMConfig.NM_VMEM_GB, 5); // 5GB
conf.set(NMConfig.NM_BIND_ADDRESS, "127.0.0.1:12345");
conf.set(NMConfig.NM_LOCALIZER_BIND_ADDRESS, "127.0.0.1:12346");
conf.set(NMConfig.NM_LOG_DIR, new Path(basedir, "logs").toUri().getPath());
conf.set(NMConfig.REMOTE_USER_LOG_DIR, new Path(basedir, "remotelogs")
.toUri().getPath());
conf.set(NMConfig.NM_LOCAL_DIR, new Path(basedir, "nm0").toUri().getPath());
nm.init(conf);
new Thread() {
public void run() {
try {
nm.start();
} catch (Error e) {
TestNodeStatusUpdater.this.nmStartError = e;
}
}
}.start();
System.out.println(" ----- thread already started.."
+ nm.getServiceState());
int waitCount = 0;
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
Thread.sleep(1000);
}
if (nmStartError != null) {
throw nmStartError;
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");
}
while (heartBeatID <= 3) {
Thread.sleep(500);
}
nm.stop();
}
}