blob: 6836288ed1cd1616277fab629cf5b4246ab2182a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestRMAppLogAggregationStatus {
private RMContext rmContext;
private YarnScheduler scheduler;
private SchedulerEventType eventType;
private ApplicationId appId;
private final class TestSchedulerEventDispatcher implements
EventHandler<SchedulerEvent> {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
}
}
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
rmContext
.setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
scheduler = mock(YarnScheduler.class);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
//DO NOTHING
}
return null;
}
}
).when(scheduler).handle(any(SchedulerEvent.class));
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testLogAggregationStatus() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, 1500);
RMApp rmApp = createRMApp(conf);
this.rmContext.getRMApps().put(appId, rmApp);
rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.START));
rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_NEW_SAVED));
rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_ACCEPTED));
// This application will be running on two nodes
NodeId nodeId1 = NodeId.newInstance("localhost", 1234);
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node1 =
new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
NodeStatus mockNodeStatus = createMockNodeStatus();
node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
RMNodeImpl node2 =
new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null,
mockNodeStatus));
rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
// The initial log aggregation status for these two nodes
// should be NOT_STARTED
Map<NodeId, LogAggregationReport> logAggregationStatus =
rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
Assert.assertEquals(LogAggregationStatus.NOT_START, report.getValue()
.getLogAggregationStatus());
}
List<LogAggregationReport> node1ReportForApp =
new ArrayList<LogAggregationReport>();
String messageForNode1_1 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1 =
LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
messageForNode1_1);
node1ReportForApp.add(report1);
NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp));
List<LogAggregationReport> node2ReportForApp =
new ArrayList<LogAggregationReport>();
String messageForNode2_1 =
"node2 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report2 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode2_1);
node2ReportForApp.add(report2);
NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp));
// node1 and node2 has updated its log aggregation status
// verify that the log aggregation status for node1, node2
// has been changed
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
Assert.assertEquals(messageForNode1_1, report.getValue()
.getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
Assert.assertEquals(messageForNode2_1, report.getValue()
.getDiagnosticMessage());
} else {
// should not contain log aggregation report for other nodes
Assert
.fail("should not contain log aggregation report for other nodes");
}
}
// node1 updates its log aggregation status again
List<LogAggregationReport> node1ReportForApp2 =
new ArrayList<LogAggregationReport>();
String messageForNode1_2 =
"node1 logAggregation status updated at " + System.currentTimeMillis();
LogAggregationReport report1_2 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2);
node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp2));
// verify that the log aggregation status for node1
// has been changed
// verify that the log aggregation status for node2
// does not change
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
Assert.assertEquals(
messageForNode1_1 + "\n" + messageForNode1_2, report
.getValue().getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
.getLogAggregationStatus());
Assert.assertEquals(messageForNode2_1, report.getValue()
.getDiagnosticMessage());
} else {
// should not contain log aggregation report for other nodes
Assert
.fail("should not contain log aggregation report for other nodes");
}
}
// kill the application
rmApp.handle(new RMAppEvent(appId, RMAppEventType.KILL));
rmApp.handle(new RMAppEvent(appId, RMAppEventType.ATTEMPT_KILLED));
rmApp.handle(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED));
Assert.assertEquals(RMAppState.KILLED, rmApp.getState());
// wait for 1500 ms
Thread.sleep(1500);
// the log aggregation status for both nodes should be changed
// to TIME_OUT
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus());
}
// Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to SUCCEEDED
List<LogAggregationReport> node1ReportForApp3 =
new ArrayList<LogAggregationReport>();
LogAggregationReport report1_3;
for (int i = 0; i < 10 ; i ++) {
report1_3 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, "test_message_" + i);
node1ReportForApp3.add(report1_3);
}
node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
LogAggregationStatus.SUCCEEDED, ""));
// For every logAggregationReport cached in memory, we can only save at most
// 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp3));
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus());
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 9; i ++) {
builder.append("test_message_" + i);
builder.append("\n");
}
builder.append("test_message_" + 9);
Assert.assertEquals(builder.toString(), report.getValue()
.getDiagnosticMessage());
} else if (report.getKey().equals(node2.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
.getLogAggregationStatus());
} else {
// should not contain log aggregation report for other nodes
Assert
.fail("should not contain log aggregation report for other nodes");
}
}
// update log aggregationStatus for node2 as FAILED,
// so the log aggregation status for the App will become FAILED,
// and we only keep the log aggregation reports whose status is FAILED,
// so the log aggregation report for node1 will be removed.
List<LogAggregationReport> node2ReportForApp2 =
new ArrayList<LogAggregationReport>();
LogAggregationReport report2_2 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
LogAggregationReport report2_3 =
LogAggregationReport.newInstance(appId,
LogAggregationStatus.FAILED, "");
node2ReportForApp2.add(report2_2);
node2ReportForApp2.add(report2_3);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp2));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertTrue(logAggregationStatus.size() == 1);
Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
Assert.assertEquals("Fail_Message",
((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
}
@Test (timeout = 10000)
public void testGetLogAggregationStatusForAppReport() {
YarnConfiguration conf = new YarnConfiguration();
// Disable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
RMAppImpl rmApp = (RMAppImpl)createRMApp(conf);
// The log aggregation status should be DISABLED.
Assert.assertEquals(LogAggregationStatus.DISABLED,
rmApp.getLogAggregationStatusForAppReport());
// Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf);
// If we do not know any NodeManagers for this application , and
// the log aggregation is enabled, the log aggregation status will
// return NOT_START
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
NodeId nodeId3 = NodeId.newInstance("localhost", 3333);
NodeId nodeId4 = NodeId.newInstance("localhost", 4444);
// If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is TIME_OUT,
// others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
// If the log aggregation status for all NMs are SUCCEEDED and Application
// is at the final state, the log aggregation status for this app will
// return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
// If the log aggregation status for at least one of NMs are RUNNING,
// the log aggregation status for this app will return RUNNING
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
// If the log aggregation status for at least one of NMs
// are RUNNING_WITH_FAILURE, the log aggregation status
// for this app will return RUNNING_WITH_FAILURE
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
""));
Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
rmApp.getLogAggregationStatusForAppReport());
// For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
// it will not be changed even it get a new log aggregation status
// as RUNNING
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
rmApp.getLogAggregationStatusForAppReport());
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED,
// others are either SUCCEEDED or TIME_OUT, and this application is
// at the final state, the log aggregation status for this app
// will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
}
private RMApp createRMApp(Configuration conf) {
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext
.newInstance(appId, "test", "default", Priority.newInstance(0),
mock(ContainerLaunchContext.class), false, true, 2,
Resource.newInstance(10, 2), "test");
return new RMAppImpl(this.appId, this.rmContext,
conf, "test", "test", "default", submissionContext,
scheduler,
this.rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null);
}
}