blob: 748cf92747eccfb62dfaeaeee7bfb6f4d3eb4ece [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000";
private static final int CACHE_TEST_CACHE_SIZE = 5;
private static final String TEST_SUMMARY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
private static final String TEST_DOMAIN_LOG_FILE_NAME
= EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
private static final Path TEST_ROOT_DIR
= new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestEntityGroupFSTimelineStore.class.getSimpleName());
private static Configuration config = new YarnConfiguration();
private static MiniDFSCluster hdfsCluster;
private static FileSystem fs;
private static FileContext fc;
private static FileContextTestHelper fileContextTestHelper =
new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
private static List<ApplicationId> sampleAppIds;
private static ApplicationId mainTestAppId;
private static Path mainTestAppDirPath;
private static Path testDoneDirPath;
private static Path testActiveDirPath;
private static String mainEntityLogFileName;
private EntityGroupFSTimelineStore store;
private TimelineEntity entityNew;
private File rootDir;
private File testJar;
@BeforeAll
public static void setupClass() throws Exception {
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
config.set(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
config.setInt(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
CACHE_TEST_CACHE_SIZE);
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster
= new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1);
for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) {
ApplicationId appId = ApplicationId.fromString(
ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST
+ i);
sampleAppIds.add(appId);
}
testActiveDirPath = getTestRootPath("active");
// Among all sample applicationIds, choose the first one for most of the
// tests.
mainTestAppId = sampleAppIds.get(0);
mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString());
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
testDoneDirPath = getTestRootPath("done");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
testDoneDirPath.toString());
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
testActiveDirPath.toString());
}
@BeforeEach
public void setup(TestInfo testInfo) throws Exception {
for (ApplicationId appId : sampleAppIds) {
Path attemotDirPath =
new Path(new Path(testActiveDirPath, appId.toString()),
getAttemptDirName(appId));
createTestFiles(appId, attemotDirPath);
}
store = new EntityGroupFSTimelineStore();
if (testInfo.getTestMethod().get().getName().contains("Plugin")) {
rootDir = GenericTestUtils.getTestDir(getClass()
.getSimpleName());
if (!rootDir.exists()) {
rootDir.mkdirs();
}
testJar = null;
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048,
EntityGroupPlugInForTest.class.getName());
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH,
testJar.getAbsolutePath());
// add "-org.apache.hadoop." as system classes
String systemClasses = "-org.apache.hadoop." + "," +
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
config.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES,
systemClasses);
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
}
store.init(config);
store.setFs(fs);
store.start();
}
@AfterEach
public void tearDown() throws Exception {
store.stop();
for (ApplicationId appId : sampleAppIds) {
fs.delete(new Path(testActiveDirPath,appId.toString()), true);
}
if (testJar != null) {
testJar.delete();
rootDir.delete();
}
}
@AfterAll
public static void tearDownClass() throws Exception {
hdfsCluster.shutdown();
FileContext fileContext = FileContext.getLocalFSFileContext();
fileContext.delete(new Path(
config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true);
}
@Test
void testAppLogsScanLogs() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
List<LogInfo> detailLogs = appLogs.getDetailLogs();
assertEquals(2, summaryLogs.size());
assertEquals(1, detailLogs.size());
for (LogInfo log : summaryLogs) {
String fileName = log.getFilename();
assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME)
|| fileName.equals(TEST_DOMAIN_LOG_FILE_NAME));
}
for (LogInfo log : detailLogs) {
String fileName = log.getFilename();
assertEquals(fileName, mainEntityLogFileName);
}
}
@Test
void testAppLogsDomainLogLastlyScanned() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
Path attemptDirPath = new Path(new Path(testActiveDirPath,
mainTestAppId.toString()),
getAttemptDirName(mainTestAppId));
//Delete the domain log from AppDirPath so first scan won't find it
fs.delete(new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME), false);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
assertEquals(1, summaryLogs.size());
assertEquals(TEST_SUMMARY_LOG_FILE_NAME, summaryLogs.get(0).getFilename());
//Generate the domain log
FSDataOutputStream out = fs.create(
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
out.close();
appLogs.scanForLogs();
assertEquals(2, summaryLogs.size());
assertEquals(TEST_DOMAIN_LOG_FILE_NAME, summaryLogs.get(0).getFilename());
}
@Test
void testMoveToDone() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
Path pathBefore = appLogs.getAppDirPath();
appLogs.moveToDone();
Path pathAfter = appLogs.getAppDirPath();
assertNotEquals(pathBefore, pathAfter);
assertTrue(pathAfter.toString().contains(testDoneDirPath.toString()));
fs.delete(pathAfter, true);
}
@Test
void testParseSummaryLogs() throws Exception {
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
long beforeScan = scanned.value();
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
PluginStoreTestUtils.verifyTestEntities(tdm);
assertEquals(beforeScan + 2L, scanned.value());
}
@Test
void testWithAnonymousUser() throws Exception {
try {
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
FileStatus fileStatus = mock(FileStatus.class);
when(fileStatus.getOwner()).thenReturn(null);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
PluginStoreTestUtils.verifyTestEntities(tdm);
} catch (IllegalArgumentException ie) {
fail("No exception needs to be thrown as anonymous user is configured");
}
}
@Test
void testCleanLogs() throws Exception {
// Create test dirs and files
// Irrelevant file, should not be reclaimed
String appDirName = mainTestAppId.toString();
String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix
+ appDirName + "_1";
Path irrelevantFilePath = new Path(
testDoneDirPath, "irrelevant.log");
FSDataOutputStream stream = fs.create(irrelevantFilePath);
stream.close();
// Irrelevant directory, should not be reclaimed
Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant");
fs.mkdirs(irrelevantDirPath);
Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath,
Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001");
// First application, untouched after creation
Path appDirClean = new Path(doneAppHomeDir, appDirName);
Path attemptDirClean = new Path(appDirClean, attemptDirName);
fs.mkdirs(attemptDirClean);
Path filePath = new Path(attemptDirClean, "test.log");
stream = fs.create(filePath);
stream.close();
// Second application, one file touched after creation
Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1");
Path attemptDirHoldByFile
= new Path(appDirHoldByFile, attemptDirName);
fs.mkdirs(attemptDirHoldByFile);
Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
stream = fs.create(filePathHold);
stream.close();
// Third application, one dir touched after creation
Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2");
Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName);
fs.mkdirs(attemptDirHoldByDir);
Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
fs.mkdirs(dirPathHold);
// Fourth application, empty dirs
Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3");
Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName);
fs.mkdirs(attemptDirEmpty);
Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
fs.mkdirs(dirPathEmpty);
// Should retain all logs after this run
MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned();
long before = dirsCleaned.value();
store.cleanLogs(testDoneDirPath, 10000);
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePath));
assertTrue(fs.exists(filePathHold));
assertTrue(fs.exists(dirPathHold));
assertTrue(fs.exists(dirPathEmpty));
// Make sure the created dir is old enough
Thread.sleep(2000);
// Touch the second application
stream = fs.append(filePathHold);
stream.writeBytes("append");
stream.close();
// Touch the third application by creating a new dir
fs.mkdirs(new Path(dirPathHold, "holdByMe"));
store.cleanLogs(testDoneDirPath, 1000);
// Verification after the second cleaner call
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePathHold));
assertTrue(fs.exists(dirPathHold));
assertTrue(fs.exists(doneAppHomeDir));
// appDirClean and appDirEmpty should be cleaned up
assertFalse(fs.exists(appDirClean));
assertFalse(fs.exists(appDirEmpty));
assertEquals(before + 2L, dirsCleaned.value());
}
@Test
void testCleanBuckets() throws Exception {
// ClusterTimeStampDir with App Log Dirs
Path clusterTimeStampDir1 = new Path(testDoneDirPath,
Long.toString(sampleAppIds.get(0).getClusterTimestamp()));
Path appDir1 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString());
Path appDir2 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString());
Path appDir3 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString());
Path appDir4 = new Path(new Path(new Path(
clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString());
// ClusterTimeStampDir with no App Log Dirs
Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235");
// Irrevelant ClusterTimeStampDir
Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant");
Path appDir5 = new Path(new Path(new Path(
clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString());
fs.mkdirs(appDir1);
fs.mkdirs(appDir2);
fs.mkdirs(appDir3);
fs.mkdirs(appDir4);
fs.mkdirs(clusterTimeStampDir2);
fs.mkdirs(appDir5);
Thread.sleep(2000);
store.cleanLogs(testDoneDirPath, 1000);
// ClusterTimeStampDir will be removed only if no App Log Dir Present
assertTrue(fs.exists(clusterTimeStampDir1));
assertFalse(fs.exists(appDir1));
assertFalse(fs.exists(appDir2));
assertFalse(fs.exists(appDir3));
assertFalse(fs.exists(appDir4));
assertFalse(fs.exists(clusterTimeStampDir2));
assertTrue(fs.exists(appDir5));
store.cleanLogs(testDoneDirPath, 1000);
assertFalse(fs.exists(clusterTimeStampDir1));
}
@Test
void testNullCheckGetEntityTimelines() throws Exception {
try {
store.getEntityTimelines("YARN_APPLICATION", null, null, null, null,
null);
} catch (NullPointerException e) {
fail("NPE when getEntityTimelines called with Null EntityIds");
}
}
@Test
void testPluginRead() throws Exception {
// Verify precondition
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
List<TimelineEntityGroupPlugin> currPlugins = store.getPlugins();
for (TimelineEntityGroupPlugin plugin : currPlugins) {
ClassLoader pluginClassLoader = plugin.getClass().getClassLoader();
assertTrue(pluginClassLoader instanceof ApplicationClassLoader,
"Should set up ApplicationClassLoader");
URL[] paths = ((URLClassLoader) pluginClassLoader).getURLs();
boolean foundJAR = false;
for (URL path : paths) {
if (path.toString().contains(testJar.getAbsolutePath())) {
foundJAR = true;
}
}
assertTrue(foundJAR, "Not found path " + testJar.getAbsolutePath()
+ " for plugin " + plugin.getClass().getName());
}
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
cacheItem);
MutableCounterLong detailLogEntityRead =
store.metrics.getGetEntityToDetailOps();
MutableStat cacheRefresh = store.metrics.getCacheRefresh();
long numEntityReadBefore = detailLogEntityRead.value();
long cacheRefreshBefore = cacheRefresh.lastStat().numSamples();
// Generate TDM
TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store);
// Verify single entity read
TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(),
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
// Verify multiple entities read
NameValuePair primaryFilter = new NameValuePair(
EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null,
null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertEquals(1, entities.getEntities().size());
for (TimelineEntity entity : entities.getEntities()) {
assertEquals(entityNew.getStartTime(), entity.getStartTime());
}
// Verify metrics
assertEquals(numEntityReadBefore + 2L, detailLogEntityRead.value());
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
}
@Test
void testSummaryRead() throws Exception {
// Load data
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
MutableCounterLong summaryLogEntityRead
= store.metrics.getGetEntityToSummaryOps();
long numEntityReadBefore = summaryLogEntityRead.value();
TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
// Verify single entity read
PluginStoreTestUtils.verifyTestEntities(tdm);
// Verify multiple entities read
TimelineEntities entities = tdm.getEntities("type_1", null, null, null,
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertThat(entities.getEntities()).hasSize(1);
for (TimelineEntity entity : entities.getEntities()) {
assertEquals((Long) 123L, entity.getStartTime());
}
// Verify metrics
assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
}
@Test
void testGetEntityPluginRead() throws Exception {
EntityGroupFSTimelineStore store = null;
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0001");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path userBase = new Path(testActiveDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
try {
store = createAndStartTimelineStore(AppState.ACTIVE);
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemotDirPath, logFileName);
TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
entityNew.getEntityType(), EnumSet.allOf(Field.class));
assertNotNull(entity);
assertEquals(entityNew.getEntityId(), entity.getEntityId());
assertEquals(entityNew.getEntityType(), entity.getEntityType());
} finally {
if (store != null) {
store.stop();
}
fs.delete(userBase, true);
}
}
@Test
void testScanActiveLogsWithInvalidFile() throws Exception {
Path invalidFile = new Path(testActiveDirPath, "invalidfile");
try {
if (!fs.exists(invalidFile)) {
fs.createNewFile(invalidFile);
}
store.scanActiveLogs();
} catch (StackOverflowError error) {
fail("EntityLogScanner crashed with StackOverflowError");
} finally {
if (fs.exists(invalidFile)) {
fs.delete(invalidFile, false);
}
}
}
@Test
void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
EntityGroupFSTimelineStore store = null;
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0002");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path userBase = new Path(testActiveDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
try {
store = createAndStartTimelineStore(AppState.COMPLETED);
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemotDirPath, logFileName);
store.scanActiveLogs();
TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
entityNew.getEntityType(), EnumSet.allOf(Field.class));
assertNotNull(entity);
assertEquals(entityNew.getEntityId(), entity.getEntityId());
assertEquals(entityNew.getEntityType(), entity.getEntityType());
} finally {
if (store != null) {
store.stop();
}
fs.delete(userBase, true);
}
}
// TestTimelineStore to validate the put entities call
static class TestTimelineStore extends LeveldbTimelineStore {
static final AtomicInteger ENTITIES_COUNT = new AtomicInteger(0);
TestTimelineStore() {
super();
}
@Override
public TimelinePutResponse put(TimelineEntities entities) {
ENTITIES_COUNT.getAndAdd(entities.getEntities().size());
return new TimelinePutResponse();
}
public static int getEntitiesCount() {
return ENTITIES_COUNT.get();
}
}
@Test
void testIfAnyDuplicateEntities() throws Exception {
// Create an application with some entities
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0002");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path activeDirPath = getTestRootPath("active1");
Path doneDirPath = getTestRootPath("done1");
Path userBase = new Path(activeDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemptDirPath = new Path(userAppRoot, getAttemptDirName(appId));
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemptDirPath, logFileName);
// stop the default store before creating new store to get the lock
store.stop();
EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
@Override
protected AppState getAppState(ApplicationId appId) throws IOException {
return AppState.ACTIVE;
}
};
try {
// Start ATS with TestTimelineStore
Configuration newConfig = new YarnConfiguration(config);
newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
TestTimelineStore.class.getName());
newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
doneDirPath.toString());
newConfig.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
activeDirPath.toString());
newStore.init(newConfig);
newStore.setFs(fs);
newStore.start();
// Validate if the initial entities count are correct
newStore.scanActiveLogs();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return TestTimelineStore.getEntitiesCount() == 2;
}
}, 100, 10000);
assertEquals(2, TestTimelineStore.getEntitiesCount(), "Wrong Initial Entities Count");
// Append the Summary log file with few more entities
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
FSDataOutputStream outStream = fs.append(
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME));
JsonGenerator jsonGenerator
= new JsonFactory().createGenerator((OutputStream) outStream);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
ObjectMapper objMapper = new ObjectMapper();
objMapper.setAnnotationIntrospector(
new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()));
objMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
for (TimelineEntity entity : entities.getEntities()) {
objMapper.writeValue(jsonGenerator, entity);
}
outStream.close();
// Validate if there are any duplicates
newStore.scanActiveLogs();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return TestTimelineStore.getEntitiesCount() == 4;
}
}, 100, 10000);
assertEquals(4, TestTimelineStore.getEntitiesCount(), "Duplicate Entities present");
} finally {
if (newStore != null) {
newStore.stop();
}
fs.delete(userAppRoot, true);
}
}
@Test
void testStateStoreAndRecovery() throws Exception {
// Prepare the AppLogs Data
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
List<EntityGroupFSTimelineStore.AppLogs> logsList = new ArrayList<>();
logsList.add(appLogs);
// Store the Log files
Path checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint");
try (DataOutputStream dataOutputStream = fs.create(checkpointFile)) {
store.storeLogFiles(logsList, dataOutputStream);
} catch (IOException e) {
fail("Failed to store the log files");
}
// Recover the Log files and validate the contents
try (DataInputStream dataInputStream = fs.open(checkpointFile)) {
HashMap<String, Pair<Long, Long>> logFiles =
store.recoverLogFiles(dataInputStream);
assertEquals(summaryLogs.size(), logFiles.size());
for (LogInfo logInfo : summaryLogs) {
String logFileName = logInfo.getAttemptDirName() +
Path.SEPARATOR + logInfo.getFilename();
Pair<Long, Long> pair = logFiles.get(logFileName);
assertNotNull(pair, "Failed to recover " + logFileName);
assertTrue(logInfo.getLastProcessedTime() == pair.getLeft(),
"LastProcessedTime is not same");
assertTrue(logInfo.getOffset() == pair.getRight(),
"Offset is not same");
}
} catch (IOException e) {
fail("Failed to recover the log files");
}
}
private EntityGroupFSTimelineStore createAndStartTimelineStore(
AppState appstate) {
// stop before creating new store to get the lock
store.stop();
EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
@Override
protected AppState getAppState(ApplicationId appId) throws IOException {
return appstate;
}
};
newStore.init(config);
newStore.setFs(fs);
newStore.start();
return newStore;
}
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
throws IOException {
createTestFiles(appId, attemptDirPath, mainEntityLogFileName);
}
private void createTestFiles(ApplicationId appId, Path attemptDirPath,
String logPath) throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
Map<String, Set<Object>> primaryFilters = new HashMap<>();
Set<Object> appSet = new HashSet<Object>();
appSet.add(appId.toString());
primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet);
entityNew = PluginStoreTestUtils
.createEntity(appId.toString(), "type_3", 789L, null, null,
primaryFilters, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
new Path(attemptDirPath, logPath), fs);
FSDataOutputStream out = fs.create(
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
out.close();
}
private static Path getTestRootPath(String pathString) {
return fileContextTestHelper.getTestRootPath(fc, pathString);
}
private static String getAttemptDirName(ApplicationId appId) {
return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
}
}