blob: b55012304cf4da6a832bad0987a1a9f80b47cad2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.ahs;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRMApplicationHistoryWriter {
private static int MAX_RETRIES = 10;
private RMApplicationHistoryWriter writer;
private ApplicationHistoryStore store;
private List<CounterDispatcher> dispatchers =
new ArrayList<CounterDispatcher>();
@Before
public void setup() {
store = new MemoryApplicationHistoryStore();
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
writer = new RMApplicationHistoryWriter() {
@Override
protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) {
return store;
}
@Override
protected Dispatcher createDispatcher(Configuration conf) {
MultiThreadedDispatcher dispatcher =
new MultiThreadedDispatcher(
conf
.getInt(
YarnConfiguration.RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
class MultiThreadedDispatcher extends
RMApplicationHistoryWriter.MultiThreadedDispatcher {
public MultiThreadedDispatcher(int num) {
super(num);
}
@Override
protected AsyncDispatcher createDispatcher() {
CounterDispatcher dispatcher = new CounterDispatcher();
dispatchers.add(dispatcher);
return dispatcher;
}
}
};
writer.init(conf);
writer.start();
}
@After
public void tearDown() {
writer.stop();
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp app = mock(RMApp.class);
when(app.getApplicationId()).thenReturn(appId);
when(app.getName()).thenReturn("test app");
when(app.getApplicationType()).thenReturn("test app type");
when(app.getUser()).thenReturn("test user");
when(app.getQueue()).thenReturn("test queue");
when(app.getSubmitTime()).thenReturn(0L);
when(app.getStartTime()).thenReturn(1L);
when(app.getFinishTime()).thenReturn(2L);
when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
return app;
}
private static RMAppAttempt createRMAppAttempt(
ApplicationAttemptId appAttemptId) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
Container container = mock(Container.class);
when(container.getId())
.thenReturn(ContainerId.newContainerId(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test url");
when(appAttempt.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
return appAttempt;
}
private static RMContainer createRMContainer(ContainerId containerId) {
RMContainer container = mock(RMContainer.class);
when(container.getContainerId()).thenReturn(containerId);
when(container.getAllocatedNode()).thenReturn(
NodeId.newInstance("test host", -100));
when(container.getAllocatedResource()).thenReturn(
Resource.newInstance(-1, -1));
when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
when(container.getCreationTime()).thenReturn(0L);
when(container.getFinishTime()).thenReturn(1L);
when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
when(container.getLogURL()).thenReturn("test log url");
when(container.getContainerExitStatus()).thenReturn(-1);
when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
return container;
}
@Test
public void testDefaultStoreSetup() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
RMApplicationHistoryWriter writer = new RMApplicationHistoryWriter();
writer.init(conf);
writer.start();
try {
Assert.assertFalse(writer.historyServiceEnabled);
Assert.assertNull(writer.writer);
} finally {
writer.stop();
writer.close();
}
}
@Test
public void testWriteApplication() throws Exception {
RMApp app = createRMApp(ApplicationId.newInstance(0, 1));
writer.applicationStarted(app);
ApplicationHistoryData appHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
if (appHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(appHD);
Assert.assertEquals("test app", appHD.getApplicationName());
Assert.assertEquals("test app type", appHD.getApplicationType());
Assert.assertEquals("test user", appHD.getUser());
Assert.assertEquals("test queue", appHD.getQueue());
Assert.assertEquals(0L, appHD.getSubmitTime());
Assert.assertEquals(1L, appHD.getStartTime());
writer.applicationFinished(app, RMAppState.FINISHED);
for (int i = 0; i < MAX_RETRIES; ++i) {
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
if (appHD.getYarnApplicationState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals(2L, appHD.getFinishTime());
Assert.assertEquals("test diagnostics info", appHD.getDiagnosticsInfo());
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
appHD.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
appHD.getYarnApplicationState());
}
@Test
public void testWriteApplicationAttempt() throws Exception {
RMAppAttempt appAttempt =
createRMAppAttempt(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
writer.applicationAttemptStarted(appAttempt);
ApplicationAttemptHistoryData appAttemptHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
appAttemptHD =
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
if (appAttemptHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(appAttemptHD);
Assert.assertEquals("test host", appAttemptHD.getHost());
Assert.assertEquals(-100, appAttemptHD.getRPCPort());
Assert.assertEquals(ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1),
appAttemptHD.getMasterContainerId());
writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED);
for (int i = 0; i < MAX_RETRIES; ++i) {
appAttemptHD =
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1));
if (appAttemptHD.getYarnApplicationAttemptState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals("test diagnostics info",
appAttemptHD.getDiagnosticsInfo());
Assert.assertEquals("test url", appAttemptHD.getTrackingURL());
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
appAttemptHD.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationAttemptState.FINISHED,
appAttemptHD.getYarnApplicationAttemptState());
}
@Test
public void testWriteContainer() throws Exception {
RMContainer container =
createRMContainer(ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
1));
writer.containerStarted(container);
ContainerHistoryData containerHD = null;
for (int i = 0; i < MAX_RETRIES; ++i) {
containerHD =
store.getContainer(ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(0, 1), 1), 1));
if (containerHD != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertNotNull(containerHD);
Assert.assertEquals(NodeId.newInstance("test host", -100),
containerHD.getAssignedNode());
Assert.assertEquals(Resource.newInstance(-1, -1),
containerHD.getAllocatedResource());
Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority());
Assert.assertEquals(0L, container.getCreationTime());
writer.containerFinished(container);
for (int i = 0; i < MAX_RETRIES; ++i) {
containerHD =
store.getContainer(ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(0, 1), 1), 1));
if (containerHD.getContainerState() != null) {
break;
} else {
Thread.sleep(100);
}
}
Assert.assertEquals("test diagnostics info",
containerHD.getDiagnosticsInfo());
Assert.assertEquals(-1, containerHD.getContainerExitStatus());
Assert.assertEquals(ContainerState.COMPLETE,
containerHD.getContainerState());
}
@Test
public void testParallelWrite() throws Exception {
List<ApplicationId> appIds = new ArrayList<ApplicationId>();
for (int i = 0; i < 10; ++i) {
Random rand = new Random(i);
ApplicationId appId = ApplicationId.newInstance(0, rand.nextInt());
appIds.add(appId);
RMApp app = createRMApp(appId);
writer.applicationStarted(app);
for (int j = 1; j <= 10; ++j) {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, j);
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
writer.applicationAttemptStarted(appAttempt);
for (int k = 1; k <= 10; ++k) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, k);
RMContainer container = createRMContainer(containerId);
writer.containerStarted(container);
writer.containerFinished(container);
}
writer.applicationAttemptFinished(
appAttempt, RMAppAttemptState.FINISHED);
}
writer.applicationFinished(app, RMAppState.FINISHED);
}
for (int i = 0; i < MAX_RETRIES; ++i) {
if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
break;
} else {
Thread.sleep(500);
}
}
Assert.assertTrue(allEventsHandled(20 * 10 * 10 + 20 * 10 + 20));
// Validate all events of one application are handled by one dispatcher
for (ApplicationId appId : appIds) {
Assert.assertTrue(handledByOne(appId));
}
}
private boolean allEventsHandled(int expected) {
int actual = 0;
for (CounterDispatcher dispatcher : dispatchers) {
for (Integer count : dispatcher.counts.values()) {
actual += count;
}
}
return actual == expected;
}
@Test
public void testRMWritingMassiveHistoryForFairSche() throws Exception {
//test WritingMassiveHistory for Fair Scheduler.
testRMWritingMassiveHistory(true);
}
@Test
public void testRMWritingMassiveHistoryForCapacitySche() throws Exception {
//test WritingMassiveHistory for Capacity Scheduler.
testRMWritingMassiveHistory(false);
}
private void testRMWritingMassiveHistory(boolean isFS) throws Exception {
// 1. Show RM can run with writing history data
// 2. Test additional workload of processing history events
YarnConfiguration conf = new YarnConfiguration();
if (isFS) {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
} else {
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
}
// don't process history events
MockRM rm = new MockRM(conf) {
@Override
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
return new RMApplicationHistoryWriter() {
@Override
public void applicationStarted(RMApp app) {
}
@Override
public void applicationFinished(RMApp app, RMAppState finalState) {
}
@Override
public void applicationAttemptStarted(RMAppAttempt appAttempt) {
}
@Override
public void applicationAttemptFinished(
RMAppAttempt appAttempt, RMAppAttemptState finalState) {
}
@Override
public void containerStarted(RMContainer container) {
}
@Override
public void containerFinished(RMContainer container) {
}
};
}
};
long startTime1 = System.currentTimeMillis();
testRMWritingMassiveHistory(rm);
long finishTime1 = System.currentTimeMillis();
long elapsedTime1 = finishTime1 - startTime1;
rm = new MockRM(conf);
long startTime2 = System.currentTimeMillis();
testRMWritingMassiveHistory(rm);
long finishTime2 = System.currentTimeMillis();
long elapsedTime2 = finishTime2 - startTime2;
// No more than 10% additional workload
// Should be much less, but computation time is fluctuated
Assert.assertTrue(elapsedTime2 - elapsedTime1 < elapsedTime1 / 10);
}
private void testRMWritingMassiveHistory(MockRM rm) throws Exception {
rm.start();
MockNM nm = rm.registerNode("127.0.0.1:1234", 1024 * 10100);
RMApp app = rm.submitApp(1024);
//Wait to make sure the attempt has the right state
//TODO explore a better way than sleeping for a while (YARN-4929)
Thread.sleep(1000);
nm.nodeHeartbeat(true);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
int request = 10000;
am.allocate("127.0.0.1", 1024, request, new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
List<Container> allocated =
am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
int waitCount = 0;
int allocatedSize = allocated.size();
while (allocatedSize < request && waitCount++ < 200) {
Thread.sleep(300);
allocated =
am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
allocatedSize += allocated.size();
nm.nodeHeartbeat(true);
}
Assert.assertEquals(request, allocatedSize);
am.unregisterAppAttempt();
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
NodeHeartbeatResponse resp = nm.nodeHeartbeat(true);
List<ContainerId> cleaned = resp.getContainersToCleanup();
int cleanedSize = cleaned.size();
waitCount = 0;
while (cleanedSize < allocatedSize && waitCount++ < 200) {
Thread.sleep(300);
resp = nm.nodeHeartbeat(true);
cleaned = resp.getContainersToCleanup();
cleanedSize += cleaned.size();
}
Assert.assertEquals(allocatedSize, cleanedSize);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
rm.stop();
}
private boolean handledByOne(ApplicationId appId) {
int count = 0;
for (CounterDispatcher dispatcher : dispatchers) {
if (dispatcher.counts.containsKey(appId)) {
++count;
}
}
return count == 1;
}
private static class CounterDispatcher extends AsyncDispatcher {
private Map<ApplicationId, Integer> counts =
new HashMap<ApplicationId, Integer>();
@SuppressWarnings("rawtypes")
@Override
protected void dispatch(Event event) {
if (event instanceof WritingApplicationHistoryEvent) {
WritingApplicationHistoryEvent ashEvent =
(WritingApplicationHistoryEvent) event;
switch (ashEvent.getType()) {
case APP_START:
incrementCounts(((WritingApplicationStartEvent) event)
.getApplicationId());
break;
case APP_FINISH:
incrementCounts(((WritingApplicationFinishEvent) event)
.getApplicationId());
break;
case APP_ATTEMPT_START:
incrementCounts(((WritingApplicationAttemptStartEvent) event)
.getApplicationAttemptId().getApplicationId());
break;
case APP_ATTEMPT_FINISH:
incrementCounts(((WritingApplicationAttemptFinishEvent) event)
.getApplicationAttemptId().getApplicationId());
break;
case CONTAINER_START:
incrementCounts(((WritingContainerStartEvent) event)
.getContainerId().getApplicationAttemptId().getApplicationId());
break;
case CONTAINER_FINISH:
incrementCounts(((WritingContainerFinishEvent) event)
.getContainerId().getApplicationAttemptId().getApplicationId());
break;
}
}
super.dispatch(event);
}
private void incrementCounts(ApplicationId appId) {
Integer val = counts.get(appId);
if (val == null) {
counts.put(appId, 1);
} else {
counts.put(appId, val + 1);
}
}
}
}