blob: fa5a5870c4ebe2203823542804ba92f3dc39a3d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcase;
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder;
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
import org.apache.log4j.Level;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
import static org.mockito.Mockito.mock;
public class TestAggregatedLogDeletionService {
private static final String T_FILE = "TFile";
private static final String I_FILE = "IFile";
private static final String USER_ME = "me";
private static final String DIR_HOST1 = "host1";
private static final String DIR_HOST2 = "host2";
private static final String ROOT = "mockfs://foo/";
private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs/";
private static final String SUFFIX = "logs";
private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
private static final List<Class<? extends LogAggregationFileController>>
ALL_FILE_CONTROLLERS = Arrays.asList(
LogAggregationIndexedFileController.class,
LogAggregationTFileController.class);
public static final List<String> ALL_FILE_CONTROLLER_NAMES = Arrays.asList(I_FILE, T_FILE);
@BeforeAll
public static void beforeClass() {
org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
}
@BeforeEach
public void closeFilesystems() throws IOException {
// prevent the same mockfs instance from being reused due to FS cache
FileSystem.closeAll();
}
private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
retainCheckIntervalSeconds);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
LogAggregationTFileController.class.getName());
return conf;
}
@Test
void testDeletion() throws Exception {
long now = System.currentTimeMillis();
long toDeleteTime = now - (2000 * 1000);
long toKeepTime = now - (1500 * 1000);
Configuration conf = setupConfiguration(1800, -1);
long timeout = 2000L;
LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withUserDir(USER_ME, toKeepTime)
.withSuffixDir(SUFFIX, toDeleteTime)
.withBucketDir(toDeleteTime)
.withApps(Lists.newArrayList(
new AppDescriptor(toDeleteTime, Lists.newArrayList()),
new AppDescriptor(toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
new AppDescriptor(toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toDeleteTime))),
new AppDescriptor(toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime)))))
.withFinishedApps(1, 2, 3)
.withRunningApps(4)
.injectExceptionForAppDirDeletion(3)
.build()
.startDeletionService()
.verifyAppDirsDeleted(timeout, 1, 3)
.verifyAppDirsNotDeleted(timeout, 2, 4)
.verifyAppFileDeleted(4, 1, timeout)
.verifyAppFileNotDeleted(4, 2, timeout)
.teardown(1);
}
@Test
void testRefreshLogRetentionSettings() throws Exception {
long now = System.currentTimeMillis();
long before2000Secs = now - (2000 * 1000);
long before50Secs = now - (50 * 1000);
int checkIntervalSeconds = 2;
int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
Configuration conf = setupConfiguration(1800, 1);
LogAggregationTestcase testcase = LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withUserDir(USER_ME, before50Secs)
.withSuffixDir(SUFFIX, before50Secs)
.withBucketDir(before50Secs)
.withApps(Lists.newArrayList(
//Set time last modified of app1Dir directory and its files to before2000Secs
new AppDescriptor(before2000Secs, Lists.newArrayList(
Pair.of(DIR_HOST1, before2000Secs))),
//Set time last modified of app1Dir directory and its files to before50Secs
new AppDescriptor(before50Secs, Lists.newArrayList(
Pair.of(DIR_HOST1, before50Secs))))
)
.withFinishedApps(1, 2)
.withRunningApps()
.build();
testcase
.startDeletionService()
//app1Dir would be deleted since it is done above log retention period
.verifyAppDirDeleted(1, 10000L)
//app2Dir is not expected to be deleted since it is below the threshold
.verifyAppDirNotDeleted(2, 3000L);
//Now, let's change the log aggregation retention configs
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
checkIntervalSeconds);
testcase
//We have not called refreshLogSettings, hence don't expect to see
// the changed conf values
.verifyCheckIntervalMilliSecondsNotEqualTo(checkIntervalMilliSeconds)
//refresh the log settings
.refreshLogRetentionSettings()
//Check interval time should reflect the new value
.verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
//app2Dir should be deleted since it falls above the threshold
.verifyAppDirDeleted(2, 10000L)
//Close expected 2 times: once for refresh and once for stopping
.teardown(2);
}
@Test
void testCheckInterval() throws Exception {
long now = System.currentTimeMillis();
long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withUserDir(USER_ME, now)
.withSuffixDir(SUFFIX, now)
.withBucketDir(now)
.withApps(Lists.newArrayList(
new AppDescriptor(now,
Lists.newArrayList(Pair.of(DIR_HOST1, now))),
new AppDescriptor(now)))
.withFinishedApps(1)
.withRunningApps()
.build()
.startDeletionService()
.verifyAnyPathListedAtLeast(4, 10000L)
.verifyAppDirNotDeleted(1, NO_TIMEOUT)
// modify the timestamp of the logs and verify if it is picked up quickly
.changeModTimeOfApp(1, toDeleteTime)
.changeModTimeOfAppLogDir(1, 1, toDeleteTime)
.changeModTimeOfBucketDir(toDeleteTime)
.reinitAllPaths()
.verifyAppDirDeleted(1, 10000L)
.teardown(1);
}
@Test
void testRobustLogDeletion() throws Exception {
Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
long modTime = 0L;
LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withUserDir(USER_ME, modTime)
.withSuffixDir(SUFFIX, modTime)
.withBucketDir(modTime, "0")
.withApps(Lists.newArrayList(
new AppDescriptor(modTime),
new AppDescriptor(modTime),
new AppDescriptor(modTime, Lists.newArrayList(Pair.of(DIR_HOST1, modTime)))))
.withAdditionalAppDirs(Lists.newArrayList(Pair.of("application_a", modTime)))
.withFinishedApps(1, 3)
.withRunningApps()
.injectExceptionForAppDirDeletion(1)
.build()
.runDeletionTask(TEN_DAYS_IN_SECONDS)
.verifyAppDirDeleted(3, NO_TIMEOUT);
}
@Test
void testDeletionTwoControllers() throws IOException {
long now = System.currentTimeMillis();
long toDeleteTime = now - (2000 * 1000);
long toKeepTime = now - (1500 * 1000);
Configuration conf = setupConfiguration(1800, -1);
enableFileControllers(conf, REMOTE_ROOT_LOG_DIR, ALL_FILE_CONTROLLERS,
ALL_FILE_CONTROLLER_NAMES);
long timeout = 2000L;
LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withBothFileControllers()
.withUserDir(USER_ME, toKeepTime)
.withSuffixDir(SUFFIX, toDeleteTime)
.withBucketDir(toDeleteTime)
.withApps(//Apps for TFile
Lists.newArrayList(
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList()),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toDeleteTime))),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
//Apps for IFile
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList()),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toDeleteTime))),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime)))))
.withFinishedApps(1, 2, 3, 5, 6, 7)
.withRunningApps(4, 8)
.injectExceptionForAppDirDeletion(3, 6)
.build()
.startDeletionService()
.verifyAppDirsDeleted(timeout, 1, 3, 5, 7)
.verifyAppDirsNotDeleted(timeout, 2, 4, 6, 8)
.verifyAppFilesDeleted(timeout, Lists.newArrayList(Pair.of(4, 1), Pair.of(8, 1)))
.verifyAppFilesNotDeleted(timeout, Lists.newArrayList(Pair.of(4, 2), Pair.of(8, 2)))
.teardown(1);
}
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));
}
public void initialize(URI name, Configuration conf) throws IOException {}
@Override
public boolean hasPathCapability(Path path, String capability) {
return true;
}
}
}