blob: c837450f0218b957781438dbefe2cb48345336b0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.junit.Before;
import org.junit.Test;
public class TestNMExpiry {
private static final Log LOG = LogFactory.getLog(TestNMExpiry.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ResourceTrackerService resourceTrackerService;
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
super(dispatcher);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
super.serviceInit(conf);
}
}
@Before
public void setUp() {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
RMContext context = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,
new NodeEventDispatcher(context));
NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
dispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager(context);
nodesListManager.init(conf);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.start();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.start();
resourceTrackerService = new ResourceTrackerService(context,
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager);
resourceTrackerService.init(conf);
resourceTrackerService.start();
}
private class ThirdNodeHeartBeatThread extends Thread {
public void run() {
int lastResponseID = 0;
while (!stopT) {
try {
org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
recordFactory
.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
nodeStatus.setNodeId(request3.getNodeId());
nodeStatus.setResponseId(lastResponseID);
nodeStatus.setNodeHealthStatus(recordFactory.newRecordInstance(NodeHealthStatus.class));
nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
lastResponseID = resourceTrackerService.nodeHeartbeat(request)
.getResponseId();
Thread.sleep(1000);
} catch(Exception e) {
LOG.info("failed to heartbeat ", e);
}
}
}
}
boolean stopT = false;
RegisterNodeManagerRequest request3;
@Test
public void testNMExpiry() throws Exception {
String hostname1 = "localhost1";
String hostname2 = "localhost2";
String hostname3 = "localhost3";
Resource capability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request1 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
resourceTrackerService.registerNodeManager(request1);
RegisterNodeManagerRequest request2 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId2 = NodeId.newInstance(hostname2, 0);
request2.setNodeId(nodeId2);
request2.setHttpPort(0);
request2.setResource(capability);
resourceTrackerService.registerNodeManager(request2);
int waitCount = 0;
while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){
synchronized (this) {
wait(100);
}
}
Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs());
request3 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId3 = NodeId.newInstance(hostname3, 0);
request3.setNodeId(nodeId3);
request3.setHttpPort(0);
request3.setResource(capability);
resourceTrackerService
.registerNodeManager(request3);
/* test to see if hostanme 3 does not expire */
stopT = false;
new ThirdNodeHeartBeatThread().start();
Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs());
stopT = true;
}
}