blob: 269dbab0760f8d0d369d0fd1789634af48bf90c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
* Unit tests of AppLogAggregatorImpl class.
*/
public class TestAppLogAggregatorImpl {
private static final File LOCAL_LOG_DIR = new File("target",
TestAppLogAggregatorImpl.class.getName() + "-localLogDir");
private static final File REMOTE_LOG_FILE = new File("target",
TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile");
@Before
public void setUp() throws IOException {
if(LOCAL_LOG_DIR.exists()) {
FileUtils.cleanDirectory(LOCAL_LOG_DIR);
}
if(REMOTE_LOG_FILE.exists()) {
FileUtils.cleanDirectory(REMOTE_LOG_FILE);
}
}
@After
public void cleanUp() throws IOException {
FileUtils.deleteDirectory(LOCAL_LOG_DIR);
FileUtils.deleteQuietly(REMOTE_LOG_FILE);
}
@Test
public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles()
throws Exception {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR, applicationId.toString());
final File containerLogDir = new File(appLogDir, containerId.toString());
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
final long logRetentionSecs = 10000;
final long recoveredLogInitedTime = -1;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
applicationId, containerId, logRetentionSecs,
recoveredLogInitedTime, logFiles, logFiles);
}
@Test
public void testAggregatorWhenNoFileOlderThanRetentionPolicyShouldUploadAll()
throws IOException {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR,
applicationId.toString());
final File containerLogDir = new File(appLogDir,
containerId.toString());
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
// set log retention period to 1 week.
final long logRententionSec = 7 * 24 * 60 * 60;
final long recoveredLogInitedTimeMillis =
System.currentTimeMillis() - 60*60;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(applicationId,
containerId, logRententionSec, recoveredLogInitedTimeMillis,
logFiles, logFiles);
}
@Test
public void testAggregatorWhenAllFilesOlderThanRetentionShouldUploadNone()
throws IOException {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR,
applicationId.toString());
final File containerLogDir = new File(appLogDir,
containerId.toString());
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
final long week = 7 * 24 * 60 * 60;
final long recoveredLogInitedTimeMillis = System.currentTimeMillis() -
2 * week * 1000;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
applicationId, containerId, week, recoveredLogInitedTimeMillis,
logFiles, new HashSet<File>());
}
/**
* Create the given number of log files under the container log directory.
* @param containerLogDir the directory to create container log files
* @param numOfFiles the number of log files to create
* @return the set of log files created
*/
private static Set<File> createContainerLogFiles(File containerLogDir,
int numOfFiles) throws IOException {
assert(numOfFiles >= 0);
assert(containerLogDir.exists());
Set<File> containerLogFiles = new HashSet<>();
for(int i = 0; i < numOfFiles; i++) {
final File logFile = new File(containerLogDir, "logfile" + i);
logFile.createNewFile();
containerLogFiles.add(logFile);
}
return containerLogFiles;
}
/**
* Verify if the application log aggregator, configured with given log
* retention period and the recovered log initialization time of
* the application, uploads and deletes the set of log files as expected.
* @param appId application id
* @param containerId container id
* @param logRetentionSecs log retention period
* @param recoveredLogInitedTimeMillis recovered log initialization time
* @param expectedFilesToDelete the set of files expected to be deleted
* @param expectedFilesToUpload the set of files expected to be uploaded.
*/
public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
ApplicationId appId, ContainerId containerId, long logRetentionSecs,
long recoveredLogInitedTimeMillis, Set<File> expectedFilesToDelete,
Set<File> expectedFilesToUpload) throws IOException {
final Set<String> filesExpected2Delete = new HashSet<>();
for(File file: expectedFilesToDelete) {
filesExpected2Delete.add(file.getAbsolutePath());
}
final Set<String> filesExpected2Upload = new HashSet<>();
for(File file: expectedFilesToUpload) {
filesExpected2Upload.add(file.getAbsolutePath());
}
// deletion service with verification to check files to delete
DeletionService deletionServiceWithExpectedFiles =
createDeletionServiceWithExpectedFile2Delete(filesExpected2Delete);
final YarnConfiguration config = new YarnConfiguration();
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
config, recoveredLogInitedTimeMillis,
deletionServiceWithExpectedFiles);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
appLogAggregator.finishLogAggregation();
appLogAggregator.run();
// verify uploaded files
ArgumentCaptor<LogValue> logValCaptor =
ArgumentCaptor.forClass(LogValue.class);
verify(appLogAggregator.getLogAggregationFileController()).write(
any(LogKey.class), logValCaptor.capture());
Set<String> filesUploaded = new HashSet<>();
LogValue logValue = logValCaptor.getValue();
for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
filesUploaded.add(file.getAbsolutePath());
}
verifyFilesUploaded(filesUploaded , filesExpected2Upload);
}
private static void verifyFilesUploaded(Set<String> filesUploaded,
Set<String> filesExpected) {
final String errMsgPrefix = "The set of files uploaded are not the same " +
"as expected";
if(filesUploaded.size() != filesExpected.size()) {
fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " +
"expected size: " + filesExpected.size());
}
for(String file: filesExpected) {
if(!filesUploaded.contains(file)) {
fail(errMsgPrefix + ": expecting " + file);
}
}
}
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
YarnConfiguration config, long recoveredLogInitedTimeMillis,
DeletionService deletionServiceWithFilesToExpect)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
final NodeId nodeId = NodeId.newInstance("localhost", 0);
final String userId = "AppLogAggregatorTest";
final UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(userId);
final LocalDirsHandlerService dirsService =
createLocalDirsHandlerService(config, rootLogDir);
final DeletionService deletionService = deletionServiceWithFilesToExpect;
final LogAggregationContext logAggregationContext = null;
final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
LogAggregationTFileController format = spy(
new LogAggregationTFileController());
format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
context, fakeLfs, recoveredLogInitedTimeMillis, format);
}
/**
* Create a deletionService that verifies the paths of container log files
* passed to the delete method of DeletionService by AppLogAggregatorImpl.
* This approach is taken due to lack of support of varargs captor in the
* current mockito version 1.8.5 (The support is added in 1.10.x).
**/
private static DeletionService createDeletionServiceWithExpectedFile2Delete(
final Set<String> expectedPathsForDeletion) {
DeletionService deletionServiceWithExpectedFiles = mock(DeletionService
.class);
// verify paths passed to first invocation of delete method against
// expected paths
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Set<String> paths = new HashSet<>();
Object[] tasks = invocationOnMock.getArguments();
for(int i = 0; i < tasks.length; i++) {
FileDeletionTask task = (FileDeletionTask) tasks[i];
for (Path path: task.getBaseDirs()) {
paths.add(new File(path.toUri().getRawPath()).getAbsolutePath());
}
}
verifyFilesToDelete(expectedPathsForDeletion, paths);
return null;
}
}).doNothing().when(deletionServiceWithExpectedFiles).delete(
any(FileDeletionTask.class));
return deletionServiceWithExpectedFiles;
}
private static void verifyFilesToDelete(Set<String> files2ToDelete,
Set<String> filesExpected) {
final String errMsgPrefix = "The set of paths for deletion are not the " +
"same as expected";
if(files2ToDelete.size() != filesExpected.size()) {
fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " +
"expected size: " + filesExpected.size());
}
for(String file: filesExpected) {
if(!files2ToDelete.contains(file)) {
fail(errMsgPrefix + ": expecting " + file);
}
}
}
private static Dispatcher createNullDispatcher() {
return new Dispatcher() {
@Override
public EventHandler getEventHandler() {
return new EventHandler() {
@Override
public void handle(Event event) {
// do nothing
}
};
}
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
// do nothing
}
};
}
private static LocalDirsHandlerService createLocalDirsHandlerService(
YarnConfiguration conf, final String rootLogDir) {
LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() {
@Override
public List<String> getLogDirsForRead() {
return new ArrayList<String>() {
{
add(rootLogDir);
}
};
}
@Override
public List<String> getLogDirsForCleanup() {
return new ArrayList<String>() {
{
add(rootLogDir);
}
};
}
};
dirsHandlerService.init(conf);
// appLogAggregator only calls LocalDirsHandlerServer for local directories
// so it is ok to not start the service.
return dirsHandlerService;
}
private static Context createContext(YarnConfiguration conf) {
return new NodeManager.NMContext(
new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(),
null,
new ApplicationACLsManager(conf),
new NMNullStateStoreService(), false, conf);
}
private static final class AppLogAggregatorInTest extends
AppLogAggregatorImpl {
final DeletionService deletionService;
final ApplicationId applicationId;
final ArgumentCaptor<LogValue> logValue;
public AppLogAggregatorInTest(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation ugi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long recoveredLogInitedTime,
LogAggregationTFileController format) throws IOException {
super(dispatcher, deletionService, conf, appId, ugi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
format);
this.applicationId = appId;
this.deletionService = deletionService;
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
}
}