| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timeline; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileContextTestHelper; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.metrics2.lib.MutableCounterLong; |
| import org.apache.hadoop.metrics2.lib.MutableStat; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.ApplicationClassLoader; |
| import org.apache.hadoop.util.JarFinder; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; |
| import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.TestInfo; |
| |
| import com.fasterxml.jackson.annotation.JsonInclude; |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.type.TypeFactory; |
| import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Supplier; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { |
| |
| private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000"; |
| private static final int CACHE_TEST_CACHE_SIZE = 5; |
| |
| private static final String TEST_SUMMARY_LOG_FILE_NAME |
| = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test"; |
| private static final String TEST_DOMAIN_LOG_FILE_NAME |
| = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test"; |
| |
| private static final Path TEST_ROOT_DIR |
| = new Path(System.getProperty("test.build.data", |
| System.getProperty("java.io.tmpdir")), |
| TestEntityGroupFSTimelineStore.class.getSimpleName()); |
| |
| private static Configuration config = new YarnConfiguration(); |
| private static MiniDFSCluster hdfsCluster; |
| private static FileSystem fs; |
| private static FileContext fc; |
| private static FileContextTestHelper fileContextTestHelper = |
| new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore"); |
| |
| private static List<ApplicationId> sampleAppIds; |
| private static ApplicationId mainTestAppId; |
| private static Path mainTestAppDirPath; |
| private static Path testDoneDirPath; |
| private static Path testActiveDirPath; |
| private static String mainEntityLogFileName; |
| |
| private EntityGroupFSTimelineStore store; |
| private TimelineEntity entityNew; |
| |
| private File rootDir; |
| private File testJar; |
| |
| @BeforeAll |
| public static void setupClass() throws Exception { |
| config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); |
| config.set( |
| YarnConfiguration |
| .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, |
| "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER"); |
| config.setInt( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE, |
| CACHE_TEST_CACHE_SIZE); |
| config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); |
| HdfsConfiguration hdfsConfig = new HdfsConfiguration(); |
| hdfsCluster |
| = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); |
| fs = hdfsCluster.getFileSystem(); |
| fc = FileContext.getFileContext(hdfsCluster.getURI(0), config); |
| |
| sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1); |
| for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) { |
| ApplicationId appId = ApplicationId.fromString( |
| ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST |
| + i); |
| sampleAppIds.add(appId); |
| } |
| testActiveDirPath = getTestRootPath("active"); |
| // Among all sample applicationIds, choose the first one for most of the |
| // tests. |
| mainTestAppId = sampleAppIds.get(0); |
| mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString()); |
| mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX |
| + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId); |
| |
| testDoneDirPath = getTestRootPath("done"); |
| config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, |
| testDoneDirPath.toString()); |
| config.set( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, |
| testActiveDirPath.toString()); |
| } |
| |
| @BeforeEach |
| public void setup(TestInfo testInfo) throws Exception { |
| for (ApplicationId appId : sampleAppIds) { |
| Path attemotDirPath = |
| new Path(new Path(testActiveDirPath, appId.toString()), |
| getAttemptDirName(appId)); |
| createTestFiles(appId, attemotDirPath); |
| } |
| |
| store = new EntityGroupFSTimelineStore(); |
| if (testInfo.getTestMethod().get().getName().contains("Plugin")) { |
| rootDir = GenericTestUtils.getTestDir(getClass() |
| .getSimpleName()); |
| if (!rootDir.exists()) { |
| rootDir.mkdirs(); |
| } |
| testJar = null; |
| testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir, |
| "test-runjar.jar", 2048, |
| EntityGroupPlugInForTest.class.getName()); |
| config.set( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSPATH, |
| testJar.getAbsolutePath()); |
| // add "-org.apache.hadoop." as system classes |
| String systemClasses = "-org.apache.hadoop." + "," + |
| ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT; |
| config.set( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_SYSTEM_CLASSES, |
| systemClasses); |
| |
| config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, |
| EntityGroupPlugInForTest.class.getName()); |
| } |
| store.init(config); |
| store.setFs(fs); |
| store.start(); |
| } |
| |
| @AfterEach |
| public void tearDown() throws Exception { |
| store.stop(); |
| for (ApplicationId appId : sampleAppIds) { |
| fs.delete(new Path(testActiveDirPath,appId.toString()), true); |
| } |
| if (testJar != null) { |
| testJar.delete(); |
| rootDir.delete(); |
| } |
| } |
| |
| @AfterAll |
| public static void tearDownClass() throws Exception { |
| hdfsCluster.shutdown(); |
| FileContext fileContext = FileContext.getLocalFSFileContext(); |
| fileContext.delete(new Path( |
| config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true); |
| } |
| |
| @Test |
| void testAppLogsScanLogs() throws Exception { |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| appLogs.scanForLogs(); |
| List<LogInfo> summaryLogs = appLogs.getSummaryLogs(); |
| List<LogInfo> detailLogs = appLogs.getDetailLogs(); |
| assertEquals(2, summaryLogs.size()); |
| assertEquals(1, detailLogs.size()); |
| |
| for (LogInfo log : summaryLogs) { |
| String fileName = log.getFilename(); |
| assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME) |
| || fileName.equals(TEST_DOMAIN_LOG_FILE_NAME)); |
| } |
| |
| for (LogInfo log : detailLogs) { |
| String fileName = log.getFilename(); |
| assertEquals(fileName, mainEntityLogFileName); |
| } |
| } |
| |
| @Test |
| void testAppLogsDomainLogLastlyScanned() throws Exception { |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| Path attemptDirPath = new Path(new Path(testActiveDirPath, |
| mainTestAppId.toString()), |
| getAttemptDirName(mainTestAppId)); |
| //Delete the domain log from AppDirPath so first scan won't find it |
| fs.delete(new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME), false); |
| appLogs.scanForLogs(); |
| List<LogInfo> summaryLogs = appLogs.getSummaryLogs(); |
| assertEquals(1, summaryLogs.size()); |
| assertEquals(TEST_SUMMARY_LOG_FILE_NAME, summaryLogs.get(0).getFilename()); |
| |
| //Generate the domain log |
| FSDataOutputStream out = fs.create( |
| new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME)); |
| out.close(); |
| |
| appLogs.scanForLogs(); |
| assertEquals(2, summaryLogs.size()); |
| assertEquals(TEST_DOMAIN_LOG_FILE_NAME, summaryLogs.get(0).getFilename()); |
| } |
| |
| @Test |
| void testMoveToDone() throws Exception { |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| Path pathBefore = appLogs.getAppDirPath(); |
| appLogs.moveToDone(); |
| Path pathAfter = appLogs.getAppDirPath(); |
| assertNotEquals(pathBefore, pathAfter); |
| assertTrue(pathAfter.toString().contains(testDoneDirPath.toString())); |
| |
| fs.delete(pathAfter, true); |
| } |
| |
| @Test |
| void testParseSummaryLogs() throws Exception { |
| TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); |
| MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary(); |
| long beforeScan = scanned.value(); |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| appLogs.scanForLogs(); |
| appLogs.parseSummaryLogs(tdm); |
| PluginStoreTestUtils.verifyTestEntities(tdm); |
| assertEquals(beforeScan + 2L, scanned.value()); |
| } |
| |
| @Test |
| void testWithAnonymousUser() throws Exception { |
| try { |
| TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| FileStatus fileStatus = mock(FileStatus.class); |
| when(fileStatus.getOwner()).thenReturn(null); |
| appLogs.scanForLogs(); |
| appLogs.parseSummaryLogs(tdm); |
| PluginStoreTestUtils.verifyTestEntities(tdm); |
| } catch (IllegalArgumentException ie) { |
| fail("No exception needs to be thrown as anonymous user is configured"); |
| } |
| } |
| |
| @Test |
| void testCleanLogs() throws Exception { |
| // Create test dirs and files |
| // Irrelevant file, should not be reclaimed |
| String appDirName = mainTestAppId.toString(); |
| String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix |
| + appDirName + "_1"; |
| Path irrelevantFilePath = new Path( |
| testDoneDirPath, "irrelevant.log"); |
| FSDataOutputStream stream = fs.create(irrelevantFilePath); |
| stream.close(); |
| // Irrelevant directory, should not be reclaimed |
| Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant"); |
| fs.mkdirs(irrelevantDirPath); |
| |
| Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath, |
| Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001"); |
| // First application, untouched after creation |
| Path appDirClean = new Path(doneAppHomeDir, appDirName); |
| Path attemptDirClean = new Path(appDirClean, attemptDirName); |
| fs.mkdirs(attemptDirClean); |
| Path filePath = new Path(attemptDirClean, "test.log"); |
| stream = fs.create(filePath); |
| stream.close(); |
| // Second application, one file touched after creation |
| Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1"); |
| Path attemptDirHoldByFile |
| = new Path(appDirHoldByFile, attemptDirName); |
| fs.mkdirs(attemptDirHoldByFile); |
| Path filePathHold = new Path(attemptDirHoldByFile, "test1.log"); |
| stream = fs.create(filePathHold); |
| stream.close(); |
| // Third application, one dir touched after creation |
| Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2"); |
| Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName); |
| fs.mkdirs(attemptDirHoldByDir); |
| Path dirPathHold = new Path(attemptDirHoldByDir, "hold"); |
| fs.mkdirs(dirPathHold); |
| // Fourth application, empty dirs |
| Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3"); |
| Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName); |
| fs.mkdirs(attemptDirEmpty); |
| Path dirPathEmpty = new Path(attemptDirEmpty, "empty"); |
| fs.mkdirs(dirPathEmpty); |
| |
| // Should retain all logs after this run |
| MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned(); |
| long before = dirsCleaned.value(); |
| store.cleanLogs(testDoneDirPath, 10000); |
| assertTrue(fs.exists(irrelevantDirPath)); |
| assertTrue(fs.exists(irrelevantFilePath)); |
| assertTrue(fs.exists(filePath)); |
| assertTrue(fs.exists(filePathHold)); |
| assertTrue(fs.exists(dirPathHold)); |
| assertTrue(fs.exists(dirPathEmpty)); |
| |
| // Make sure the created dir is old enough |
| Thread.sleep(2000); |
| // Touch the second application |
| stream = fs.append(filePathHold); |
| stream.writeBytes("append"); |
| stream.close(); |
| // Touch the third application by creating a new dir |
| fs.mkdirs(new Path(dirPathHold, "holdByMe")); |
| |
| store.cleanLogs(testDoneDirPath, 1000); |
| |
| // Verification after the second cleaner call |
| assertTrue(fs.exists(irrelevantDirPath)); |
| assertTrue(fs.exists(irrelevantFilePath)); |
| assertTrue(fs.exists(filePathHold)); |
| assertTrue(fs.exists(dirPathHold)); |
| assertTrue(fs.exists(doneAppHomeDir)); |
| |
| // appDirClean and appDirEmpty should be cleaned up |
| assertFalse(fs.exists(appDirClean)); |
| assertFalse(fs.exists(appDirEmpty)); |
| assertEquals(before + 2L, dirsCleaned.value()); |
| } |
| |
| @Test |
| void testCleanBuckets() throws Exception { |
| // ClusterTimeStampDir with App Log Dirs |
| Path clusterTimeStampDir1 = new Path(testDoneDirPath, |
| Long.toString(sampleAppIds.get(0).getClusterTimestamp())); |
| Path appDir1 = new Path(new Path(new Path( |
| clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString()); |
| Path appDir2 = new Path(new Path(new Path( |
| clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString()); |
| Path appDir3 = new Path(new Path(new Path( |
| clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString()); |
| Path appDir4 = new Path(new Path(new Path( |
| clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString()); |
| |
| // ClusterTimeStampDir with no App Log Dirs |
| Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235"); |
| |
| // Irrevelant ClusterTimeStampDir |
| Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant"); |
| Path appDir5 = new Path(new Path(new Path( |
| clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString()); |
| |
| fs.mkdirs(appDir1); |
| fs.mkdirs(appDir2); |
| fs.mkdirs(appDir3); |
| fs.mkdirs(appDir4); |
| fs.mkdirs(clusterTimeStampDir2); |
| fs.mkdirs(appDir5); |
| |
| Thread.sleep(2000); |
| |
| store.cleanLogs(testDoneDirPath, 1000); |
| |
| // ClusterTimeStampDir will be removed only if no App Log Dir Present |
| assertTrue(fs.exists(clusterTimeStampDir1)); |
| assertFalse(fs.exists(appDir1)); |
| assertFalse(fs.exists(appDir2)); |
| assertFalse(fs.exists(appDir3)); |
| assertFalse(fs.exists(appDir4)); |
| assertFalse(fs.exists(clusterTimeStampDir2)); |
| assertTrue(fs.exists(appDir5)); |
| |
| store.cleanLogs(testDoneDirPath, 1000); |
| assertFalse(fs.exists(clusterTimeStampDir1)); |
| } |
| |
| @Test |
| void testNullCheckGetEntityTimelines() throws Exception { |
| try { |
| store.getEntityTimelines("YARN_APPLICATION", null, null, null, null, |
| null); |
| } catch (NullPointerException e) { |
| fail("NPE when getEntityTimelines called with Null EntityIds"); |
| } |
| } |
| |
| @Test |
| void testPluginRead() throws Exception { |
| // Verify precondition |
| assertEquals(EntityGroupPlugInForTest.class.getName(), |
| store.getConfig().get( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); |
| List<TimelineEntityGroupPlugin> currPlugins = store.getPlugins(); |
| for (TimelineEntityGroupPlugin plugin : currPlugins) { |
| ClassLoader pluginClassLoader = plugin.getClass().getClassLoader(); |
| assertTrue(pluginClassLoader instanceof ApplicationClassLoader, |
| "Should set up ApplicationClassLoader"); |
| URL[] paths = ((URLClassLoader) pluginClassLoader).getURLs(); |
| boolean foundJAR = false; |
| for (URL path : paths) { |
| if (path.toString().contains(testJar.getAbsolutePath())) { |
| foundJAR = true; |
| } |
| } |
| assertTrue(foundJAR, "Not found path " + testJar.getAbsolutePath() |
| + " for plugin " + plugin.getClass().getName()); |
| } |
| // Load data and cache item, prepare timeline store by making a cache item |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| EntityCacheItem cacheItem = new EntityCacheItem( |
| EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), |
| config); |
| cacheItem.setAppLogs(appLogs); |
| store.setCachedLogs( |
| EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), |
| cacheItem); |
| MutableCounterLong detailLogEntityRead = |
| store.metrics.getGetEntityToDetailOps(); |
| MutableStat cacheRefresh = store.metrics.getCacheRefresh(); |
| long numEntityReadBefore = detailLogEntityRead.value(); |
| long cacheRefreshBefore = cacheRefresh.lastStat().numSamples(); |
| |
| // Generate TDM |
| TimelineDataManager tdm |
| = PluginStoreTestUtils.getTdmWithStore(config, store); |
| |
| // Verify single entity read |
| TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(), |
| EnumSet.allOf(TimelineReader.Field.class), |
| UserGroupInformation.getLoginUser()); |
| assertNotNull(entity3); |
| assertEquals(entityNew.getStartTime(), entity3.getStartTime()); |
| // Verify multiple entities read |
| NameValuePair primaryFilter = new NameValuePair( |
| EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString()); |
| TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null, |
| null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), |
| UserGroupInformation.getLoginUser()); |
| assertEquals(1, entities.getEntities().size()); |
| for (TimelineEntity entity : entities.getEntities()) { |
| assertEquals(entityNew.getStartTime(), entity.getStartTime()); |
| } |
| // Verify metrics |
| assertEquals(numEntityReadBefore + 2L, detailLogEntityRead.value()); |
| assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples()); |
| } |
| |
| @Test |
| void testSummaryRead() throws Exception { |
| // Load data |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, |
| AppState.COMPLETED); |
| MutableCounterLong summaryLogEntityRead |
| = store.metrics.getGetEntityToSummaryOps(); |
| long numEntityReadBefore = summaryLogEntityRead.value(); |
| TimelineDataManager tdm |
| = PluginStoreTestUtils.getTdmWithStore(config, store); |
| appLogs.scanForLogs(); |
| appLogs.parseSummaryLogs(tdm); |
| |
| // Verify single entity read |
| PluginStoreTestUtils.verifyTestEntities(tdm); |
| // Verify multiple entities read |
| TimelineEntities entities = tdm.getEntities("type_1", null, null, null, |
| null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), |
| UserGroupInformation.getLoginUser()); |
| assertThat(entities.getEntities()).hasSize(1); |
| for (TimelineEntity entity : entities.getEntities()) { |
| assertEquals((Long) 123L, entity.getStartTime()); |
| } |
| // Verify metrics |
| assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value()); |
| |
| } |
| |
| @Test |
| void testGetEntityPluginRead() throws Exception { |
| EntityGroupFSTimelineStore store = null; |
| ApplicationId appId = |
| ApplicationId.fromString("application_1501509265053_0001"); |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| Path userBase = new Path(testActiveDirPath, user); |
| Path userAppRoot = new Path(userBase, appId.toString()); |
| Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId)); |
| |
| try { |
| store = createAndStartTimelineStore(AppState.ACTIVE); |
| String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX |
| + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId); |
| createTestFiles(appId, attemotDirPath, logFileName); |
| TimelineEntity entity = store.getEntity(entityNew.getEntityId(), |
| entityNew.getEntityType(), EnumSet.allOf(Field.class)); |
| assertNotNull(entity); |
| assertEquals(entityNew.getEntityId(), entity.getEntityId()); |
| assertEquals(entityNew.getEntityType(), entity.getEntityType()); |
| } finally { |
| if (store != null) { |
| store.stop(); |
| } |
| fs.delete(userBase, true); |
| } |
| } |
| |
| @Test |
| void testScanActiveLogsWithInvalidFile() throws Exception { |
| Path invalidFile = new Path(testActiveDirPath, "invalidfile"); |
| try { |
| if (!fs.exists(invalidFile)) { |
| fs.createNewFile(invalidFile); |
| } |
| store.scanActiveLogs(); |
| } catch (StackOverflowError error) { |
| fail("EntityLogScanner crashed with StackOverflowError"); |
| } finally { |
| if (fs.exists(invalidFile)) { |
| fs.delete(invalidFile, false); |
| } |
| } |
| } |
| |
| @Test |
| void testScanActiveLogsAndMoveToDonePluginRead() throws Exception { |
| EntityGroupFSTimelineStore store = null; |
| ApplicationId appId = |
| ApplicationId.fromString("application_1501509265053_0002"); |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| Path userBase = new Path(testActiveDirPath, user); |
| Path userAppRoot = new Path(userBase, appId.toString()); |
| Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId)); |
| |
| try { |
| store = createAndStartTimelineStore(AppState.COMPLETED); |
| String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX |
| + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId); |
| createTestFiles(appId, attemotDirPath, logFileName); |
| store.scanActiveLogs(); |
| |
| TimelineEntity entity = store.getEntity(entityNew.getEntityId(), |
| entityNew.getEntityType(), EnumSet.allOf(Field.class)); |
| assertNotNull(entity); |
| assertEquals(entityNew.getEntityId(), entity.getEntityId()); |
| assertEquals(entityNew.getEntityType(), entity.getEntityType()); |
| } finally { |
| if (store != null) { |
| store.stop(); |
| } |
| fs.delete(userBase, true); |
| } |
| } |
| |
| // TestTimelineStore to validate the put entities call |
| static class TestTimelineStore extends LeveldbTimelineStore { |
| static final AtomicInteger ENTITIES_COUNT = new AtomicInteger(0); |
| |
| TestTimelineStore() { |
| super(); |
| } |
| |
| @Override |
| public TimelinePutResponse put(TimelineEntities entities) { |
| ENTITIES_COUNT.getAndAdd(entities.getEntities().size()); |
| return new TimelinePutResponse(); |
| } |
| |
| public static int getEntitiesCount() { |
| return ENTITIES_COUNT.get(); |
| } |
| } |
| |
| @Test |
| void testIfAnyDuplicateEntities() throws Exception { |
| // Create an application with some entities |
| ApplicationId appId = |
| ApplicationId.fromString("application_1501509265053_0002"); |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| Path activeDirPath = getTestRootPath("active1"); |
| Path doneDirPath = getTestRootPath("done1"); |
| Path userBase = new Path(activeDirPath, user); |
| Path userAppRoot = new Path(userBase, appId.toString()); |
| Path attemptDirPath = new Path(userAppRoot, getAttemptDirName(appId)); |
| |
| String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX |
| + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId); |
| createTestFiles(appId, attemptDirPath, logFileName); |
| |
| // stop the default store before creating new store to get the lock |
| store.stop(); |
| EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() { |
| @Override |
| protected AppState getAppState(ApplicationId appId) throws IOException { |
| return AppState.ACTIVE; |
| } |
| }; |
| |
| try { |
| // Start ATS with TestTimelineStore |
| Configuration newConfig = new YarnConfiguration(config); |
| newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE, |
| TestTimelineStore.class.getName()); |
| newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, |
| doneDirPath.toString()); |
| newConfig.set( |
| YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, |
| activeDirPath.toString()); |
| newStore.init(newConfig); |
| newStore.setFs(fs); |
| newStore.start(); |
| |
| // Validate if the initial entities count are correct |
| newStore.scanActiveLogs(); |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return TestTimelineStore.getEntitiesCount() == 2; |
| } |
| }, 100, 10000); |
| assertEquals(2, TestTimelineStore.getEntitiesCount(), "Wrong Initial Entities Count"); |
| |
| // Append the Summary log file with few more entities |
| TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); |
| FSDataOutputStream outStream = fs.append( |
| new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME)); |
| JsonGenerator jsonGenerator |
| = new JsonFactory().createGenerator((OutputStream) outStream); |
| jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); |
| ObjectMapper objMapper = new ObjectMapper(); |
| objMapper.setAnnotationIntrospector( |
| new JaxbAnnotationIntrospector(TypeFactory.defaultInstance())); |
| objMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| for (TimelineEntity entity : entities.getEntities()) { |
| objMapper.writeValue(jsonGenerator, entity); |
| } |
| outStream.close(); |
| |
| // Validate if there are any duplicates |
| newStore.scanActiveLogs(); |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return TestTimelineStore.getEntitiesCount() == 4; |
| } |
| }, 100, 10000); |
| assertEquals(4, TestTimelineStore.getEntitiesCount(), "Duplicate Entities present"); |
| |
| } finally { |
| if (newStore != null) { |
| newStore.stop(); |
| } |
| fs.delete(userAppRoot, true); |
| } |
| } |
| |
| @Test |
| void testStateStoreAndRecovery() throws Exception { |
| // Prepare the AppLogs Data |
| EntityGroupFSTimelineStore.AppLogs appLogs = |
| store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); |
| appLogs.scanForLogs(); |
| List<LogInfo> summaryLogs = appLogs.getSummaryLogs(); |
| List<EntityGroupFSTimelineStore.AppLogs> logsList = new ArrayList<>(); |
| logsList.add(appLogs); |
| |
| // Store the Log files |
| Path checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint"); |
| try (DataOutputStream dataOutputStream = fs.create(checkpointFile)) { |
| store.storeLogFiles(logsList, dataOutputStream); |
| } catch (IOException e) { |
| fail("Failed to store the log files"); |
| } |
| |
| // Recover the Log files and validate the contents |
| try (DataInputStream dataInputStream = fs.open(checkpointFile)) { |
| HashMap<String, Pair<Long, Long>> logFiles = |
| store.recoverLogFiles(dataInputStream); |
| assertEquals(summaryLogs.size(), logFiles.size()); |
| for (LogInfo logInfo : summaryLogs) { |
| String logFileName = logInfo.getAttemptDirName() + |
| Path.SEPARATOR + logInfo.getFilename(); |
| Pair<Long, Long> pair = logFiles.get(logFileName); |
| assertNotNull(pair, "Failed to recover " + logFileName); |
| assertTrue(logInfo.getLastProcessedTime() == pair.getLeft(), |
| "LastProcessedTime is not same"); |
| assertTrue(logInfo.getOffset() == pair.getRight(), |
| "Offset is not same"); |
| } |
| } catch (IOException e) { |
| fail("Failed to recover the log files"); |
| } |
| } |
| |
| |
| private EntityGroupFSTimelineStore createAndStartTimelineStore( |
| AppState appstate) { |
| // stop before creating new store to get the lock |
| store.stop(); |
| |
| EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() { |
| @Override |
| protected AppState getAppState(ApplicationId appId) throws IOException { |
| return appstate; |
| } |
| }; |
| newStore.init(config); |
| newStore.setFs(fs); |
| newStore.start(); |
| return newStore; |
| } |
| |
| private void createTestFiles(ApplicationId appId, Path attemptDirPath) |
| throws IOException { |
| createTestFiles(appId, attemptDirPath, mainEntityLogFileName); |
| } |
| |
| private void createTestFiles(ApplicationId appId, Path attemptDirPath, |
| String logPath) throws IOException { |
| TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); |
| PluginStoreTestUtils.writeEntities(entities, |
| new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs); |
| Map<String, Set<Object>> primaryFilters = new HashMap<>(); |
| Set<Object> appSet = new HashSet<Object>(); |
| appSet.add(appId.toString()); |
| primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet); |
| entityNew = PluginStoreTestUtils |
| .createEntity(appId.toString(), "type_3", 789L, null, null, |
| primaryFilters, null, "domain_id_1"); |
| TimelineEntities entityList = new TimelineEntities(); |
| entityList.addEntity(entityNew); |
| PluginStoreTestUtils.writeEntities(entityList, |
| new Path(attemptDirPath, logPath), fs); |
| |
| FSDataOutputStream out = fs.create( |
| new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME)); |
| out.close(); |
| } |
| |
| private static Path getTestRootPath(String pathString) { |
| return fileContextTestHelper.getTestRootPath(fc, pathString); |
| } |
| |
| private static String getAttemptDirName(ApplicationId appId) { |
| return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1"; |
| } |
| } |