blob: 35148e9a450a4e048559b4d06eec44464feffbd2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.slf4j.event.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
public class TestNodesListManager {
private boolean isRMAppEvent;
private boolean isNodesListEvent;
@Test(timeout = 300000)
public void testNodeUsableEvent() throws Exception {
GenericTestUtils.setRootLogLevel(Level.DEBUG);
final Dispatcher dispatcher = getDispatcher();
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 28000);
Resource clusterResource = Resource.newInstance(28000, 8);
RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
// Create killing APP
RMApp killRmApp = MockRMAppSubmitter.submitWithMemory(200, rm);
rm.killApp(killRmApp.getApplicationId());
rm.waitForState(killRmApp.getApplicationId(), RMAppState.KILLED);
// Create finish APP
RMApp finshRmApp = MockRMAppSubmitter.submitWithMemory(2000, rm);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = finshRmApp.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
am.unregisterAppAttempt();
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
// Fire Event for NODE_USABLE
// Should not have RMAppNodeUpdateEvent to AsyncDispatcher.
dispatcher.getEventHandler().handle(new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmnode));
Assert.assertFalse("Got unexpected RM app event",
getIsRMAppEvent());
Assert.assertTrue("Received no NodesListManagerEvent",
getIsNodesListEvent());
}
@Test
public void testCachedResolver() throws Exception {
GenericTestUtils.setRootLogLevel(Level.DEBUG);
ControlledClock clock = new ControlledClock();
clock.setTime(0);
final int CACHE_EXPIRY_INTERVAL_SECS = 30;
NodesListManager.CachedResolver resolver =
new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
resolver.init(new YarnConfiguration());
resolver.start();
resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
// test removeFromCache
resolver.removeFromCache("testCachedResolverHost1");
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
// test expiry
clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
resolver.getExpireChecker().run();
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertNotEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
}
@Test
public void testDefaultResolver() throws Exception {
GenericTestUtils.setRootLogLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(conf);
rm.init(conf);
NodesListManager nodesListManager = rm.getNodesListManager();
NodesListManager.Resolver resolver = nodesListManager.getResolver();
Assert.assertTrue("default resolver should be DirectResolver",
resolver instanceof NodesListManager.DirectResolver);
}
@Test
public void testCachedResolverWithEvent() throws Exception {
GenericTestUtils.setRootLogLevel(Level.DEBUG);
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
MockRM rm = new MockRM(conf);
rm.init(conf);
NodesListManager nodesListManager = rm.getNodesListManager();
nodesListManager.init(conf);
nodesListManager.start();
NodesListManager.CachedResolver resolver =
(NodesListManager.CachedResolver)nodesListManager.getResolver();
resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
Assert.assertEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost1", 1234);
RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "testCachedResolverHost2", 1234);
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode1));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
nodesListManager.handle(
new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
rmnode2));
Assert.assertNotEquals("1.1.1.1",
resolver.resolve("testCachedResolverHost1"));
Assert.assertNotEquals("1.1.1.2",
resolver.resolve("testCachedResolverHost2"));
}
/*
* Create dispatcher object
*/
private Dispatcher getDispatcher() {
return new DrainDispatcher() {
@SuppressWarnings("unchecked")
@Override
public EventHandler<Event> getEventHandler() {
class EventArgMatcher implements ArgumentMatcher<AbstractEvent> {
@Override
public boolean matches(AbstractEvent argument) {
if (argument instanceof RMAppNodeUpdateEvent) {
isRMAppEvent = true;
}
if (argument instanceof NodesListManagerEvent) {
isNodesListEvent = true;
}
return false;
}
}
EventHandler handler = spy(super.getEventHandler());
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
return handler;
}
};
}
public boolean getIsNodesListEvent() {
return isNodesListEvent;
}
public boolean getIsRMAppEvent() {
return isRMAppEvent;
}
}