blob: fdb32385e5af5c41a6f3020c6b2d8a9d4932e13f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline
* services (v1/v2) as specified by the configuration.
*/
public class TestCombinedSystemMetricsPublisher {
/**
* The folder where the FileSystemTimelineWriterImpl writes the entities.
*/
private static File testRootDir = new File("target",
TestCombinedSystemMetricsPublisher.class.getName() + "-localDir")
.getAbsoluteFile();
private static ApplicationHistoryServer timelineServer;
private static CombinedSystemMetricsPublisher metricsPublisher;
private static TimelineStore store;
private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
private static RMTimelineCollectorManager rmTimelineCollectorManager;
private static DrainDispatcher dispatcher;
private static YarnConfiguration conf;
private static TimelineServiceV1Publisher publisherV1;
private static TimelineServiceV2Publisher publisherV2;
private static ApplicationAttemptId appAttemptId;
private static RMApp app;
private void testSetup(boolean enableV1, boolean enableV2) throws Exception {
if (testRootDir.exists()) {
//cleanup before hand
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
conf = getConf(enableV1, enableV2);
RMContext rmContext = mock(RMContext.class);
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
ResourceManager rm = mock(ResourceManager.class);
when(rm.getRMContext()).thenReturn(rmContext);
if (enableV2) {
dispatcher = new DrainDispatcher();
rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
rmTimelineCollectorManager);
rmTimelineCollectorManager.init(conf);
rmTimelineCollectorManager.start();
} else {
dispatcher = null;
rmTimelineCollectorManager = null;
}
timelineServer = new ApplicationHistoryServer();
timelineServer.init(conf);
timelineServer.start();
store = timelineServer.getTimelineStore();
if (enableV2) {
dispatcher.init(conf);
dispatcher.start();
}
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
Assert.assertTrue(enableV1);
publisherV1 = new TimelineServiceV1Publisher();
publishers.add(publisherV1);
publisherV1.init(conf);
publisherV1.start();
} else {
Assert.assertFalse(enableV1);
publisherV1 = null;
}
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
Assert.assertTrue(enableV2);
publisherV2 = new TimelineServiceV2Publisher(
rmTimelineCollectorManager) {
@Override
protected Dispatcher getDispatcher() {
return dispatcher;
}
};
publishers.add(publisherV2);
publisherV2.init(conf);
publisherV2.start();
} else {
Assert.assertFalse(enableV2);
publisherV2 = null;
}
if (publishers.isEmpty()) {
NoOpSystemMetricPublisher noopPublisher =
new NoOpSystemMetricPublisher();
publishers.add(noopPublisher);
}
metricsPublisher = new CombinedSystemMetricsPublisher(publishers);
}
private void testCleanup() throws Exception {
if (publisherV1 != null) {
publisherV1.stop();
}
if (publisherV2 != null) {
publisherV2.stop();
}
if (timelineServer != null) {
timelineServer.stop();
}
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
if (rmTimelineCollectorManager != null) {
rmTimelineCollectorManager.stop();
}
}
private static YarnConfiguration getConf(boolean v1Enabled,
boolean v2Enabled) {
YarnConfiguration yarnConf = new YarnConfiguration();
if (v1Enabled || v2Enabled) {
yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
} else {
yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
}
if (v1Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
}
if (v2Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
true);
yarnConf.setBoolean(
YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true);
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
try {
yarnConf.set(
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
testRootDir.getCanonicalPath());
} catch (IOException e) {
e.printStackTrace();
Assert.fail("Exception while setting the " +
"TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
}
}
if (v1Enabled && v2Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
}
yarnConf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
return yarnConf;
}
// runs test to validate timeline events are published if and only if the
// service is enabled for v1 and v2 (independently).
private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
testSetup(v1Enabled, v2Enabled);
publishEvents(v1Enabled, v2Enabled);
validateV1(v1Enabled);
validateV2(v2Enabled);
testCleanup();
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV1V2Enabled()
throws Exception {
runTest(true, true);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV1Enabled() throws Exception {
runTest(true, false);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV2Enabled() throws Exception {
runTest(false, true);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingNoService() throws Exception {
runTest(false, false);
}
@Test(timeout = 10000)
public void testTimelineServiceConfiguration()
throws Exception {
Configuration config = new Configuration(false);
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0,1.5");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV1Enabled(config));
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0,1");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.5");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertTrue(YarnConfiguration.timelineServiceV1Enabled(config));
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0");
config.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.5");
Assert.assertTrue(YarnConfiguration.timelineServiceV2Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV15Enabled(config));
Assert.assertFalse(YarnConfiguration.timelineServiceV1Enabled(config));
}
private void publishEvents(boolean v1Enabled, boolean v2Enabled) {
long timestamp = (v1Enabled) ? 1 : 2;
int id = (v2Enabled) ? 3 : 4;
ApplicationId appId = ApplicationId.newInstance(timestamp, id);
app = createRMApp(appId);
rmAppsMapInContext.putIfAbsent(appId, app);
if (v2Enabled) {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId);
rmTimelineCollectorManager.putIfAbsent(appId, collector);
}
appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
RMAppAttempt appAttempt = createRMAppAttempt(true);
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
app, Integer.MAX_VALUE + 2L);
if (v2Enabled) {
dispatcher.await();
}
}
private void validateV1(boolean v1Enabled) throws Exception {
TimelineEntity entity = null;
if (!v1Enabled) {
Thread.sleep(1000);
entity =
store.getEntity(appAttemptId.toString(),
AppAttemptMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
Assert.assertNull(entity);
return;
}
do {
entity =
store.getEntity(appAttemptId.toString(),
AppAttemptMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
Thread.sleep(100);
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
boolean hasRegisteredEvent = false;
boolean hasFinishedEvent = false;
for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event :
entity.getEvents()) {
if (event.getEventType().equals(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
hasRegisteredEvent = true;
} else if (event.getEventType().equals(
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(
FinalApplicationStatus.UNDEFINED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.FINAL_STATUS_INFO));
Assert.assertEquals(
YarnApplicationAttemptState.FINISHED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.STATE_INFO));
}
Assert
.assertEquals(appAttemptId.toString(), entity.getEntityId());
}
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
}
private void validateV2(boolean v2Enabled) throws Exception {
String outputDirApp =
getTimelineEntityDir() + "/"
+ TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
File entityFolder = new File(outputDirApp);
Assert.assertEquals(v2Enabled, entityFolder.isDirectory());
if (v2Enabled) {
String timelineServiceFileName = appAttemptId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File entityFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(entityFile.exists());
long idPrefix = TimelineServiceHelper
.invertLong(appAttemptId.getAttemptId());
verifyEntity(entityFile, 2,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix);
}
}
private void verifyEntity(File entityFile, long expectedEvents,
String eventForCreatedTime, long expectedMetrics, long idPrefix)
throws IOException {
BufferedReader reader = null;
String strLine;
long count = 0;
long metricsCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity = FileSystemTimelineReaderImpl
.getTimelineRecordFromJSON(strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
metricsCount = entity.getMetrics().size();
assertEquals(idPrefix, entity.getIdPrefix());
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventForCreatedTime)) {
assertTrue(entity.getCreatedTime() > 0);
break;
}
}
count++;
}
}
} finally {
reader.close();
}
assertEquals("Expected " + expectedEvents + " events to be published",
expectedEvents, count);
assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
expectedMetrics, metricsCount);
}
private String getTimelineEntityDir() {
String outputDirApp =
testRootDir.getAbsolutePath() + "/"
+ FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+ app.getUser() + "/"
+ app.getName() + "/"
+ TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+ app.getStartTime() + "/"
+ app.getApplicationId();
return outputDirApp;
}
private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
if (!unmanagedAMAttempt) {
Container container = mock(Container.class);
when(container.getId())
.thenReturn(ContainerId.newContainerId(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
}
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
return appAttempt;
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp rmApp = mock(RMAppImpl.class);
when(rmApp.getApplicationId()).thenReturn(appId);
when(rmApp.getName()).thenReturn("test app");
when(rmApp.getApplicationType()).thenReturn("test app type");
when(rmApp.getUser()).thenReturn("testUser");
when(rmApp.getQueue()).thenReturn("test queue");
when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
when(rmApp.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(appId, 1));
when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt);
when(rmApp.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
Map<String, Long> resourceMap = new HashMap<>();
resourceMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
Map<String, Long> preemptedMap = new HashMap<>();
preemptedMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
when(rmApp.getRMAppMetrics()).thenReturn(
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
preemptedMap));
when(rmApp.getApplicationTags()).thenReturn(
Collections.<String> emptySet());
ApplicationSubmissionContext appSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(appSubmissionContext.getPriority())
.thenReturn(Priority.newInstance(0));
ContainerLaunchContext containerLaunchContext =
mock(ContainerLaunchContext.class);
when(containerLaunchContext.getCommands())
.thenReturn(Collections.singletonList("java -Xmx1024m"));
when(appSubmissionContext.getAMContainerSpec())
.thenReturn(containerLaunchContext);
when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10));
when(rmApp.getApplicationSubmissionContext())
.thenReturn(appSubmissionContext);
return rmApp;
}
}