blob: 3a605f72395121b28860b56217be52220542d9d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Random;
public class TestHadoopArchiveLogs {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
private static final String USER = System.getProperty("user.name");
private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static {
new Random().nextBytes(DUMMY_DATA);
}
@Test(timeout = 10000)
public void testCheckFilesAndSeedApps() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf);
Path rootLogDir = new Path("target", "logs");
String suffix = "logs";
Path logDir = new Path(rootLogDir, new Path(USER, suffix));
fs.mkdirs(logDir);
// no files found
ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
Path app1Path = new Path(logDir, appId1.toString());
fs.mkdirs(app1Path);
// too few files
ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
Path app2Path = new Path(logDir, appId2.toString());
fs.mkdirs(app2Path);
createFile(fs, new Path(app2Path, "file1"), 1);
hal.minNumLogFiles = 2;
// too large
ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3);
Path app3Path = new Path(logDir, appId3.toString());
fs.mkdirs(app3Path);
createFile(fs, new Path(app3Path, "file1"), 2);
createFile(fs, new Path(app3Path, "file2"), 5);
hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
// has har already
ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4);
Path app4Path = new Path(logDir, appId4.toString());
fs.mkdirs(app4Path);
createFile(fs, new Path(app4Path, appId4 + ".har"), 1);
// just right
ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5);
Path app5Path = new Path(logDir, appId5.toString());
fs.mkdirs(app5Path);
createFile(fs, new Path(app5Path, "file1"), 2);
createFile(fs, new Path(app5Path, "file2"), 3);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertEquals(appId5.toString(),
hal.eligibleApplications.iterator().next().getAppId());
}
@Test(timeout = 10000)
public void testCheckMaxEligible() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER);
app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER);
app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER);
// app3 has no finish time set
HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER);
app4.setFinishTime(CLUSTER_TIMESTAMP + 5);
HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER);
app5.setFinishTime(CLUSTER_TIMESTAMP + 10);
HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER);
// app6 has no finish time set
HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER);
app7.setFinishTime(CLUSTER_TIMESTAMP);
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.eligibleApplications.add(app6);
hal.eligibleApplications.add(app7);
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = -1;
hal.checkMaxEligible();
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = 6;
hal.checkMaxEligible();
Assert.assertEquals(6, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5));
hal.maxEligible = 5;
hal.checkMaxEligible();
Assert.assertEquals(5, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4));
hal.maxEligible = 4;
hal.checkMaxEligible();
Assert.assertEquals(4, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app7));
hal.maxEligible = 3;
hal.checkMaxEligible();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app1));
hal.maxEligible = 2;
hal.checkMaxEligible();
Assert.assertEquals(2, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app2));
hal.maxEligible = 1;
hal.checkMaxEligible();
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app6));
Assert.assertTrue(hal.eligibleApplications.contains(app3));
}
@Test(timeout = 30000)
public void testFilterAppsByAggregatedStatus() throws Exception {
try (MiniYARNCluster yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(),
1, 1, 1, 1)) {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext,
LogAggregationStatus.DISABLED);
RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext,
LogAggregationStatus.FAILED);
RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext,
LogAggregationStatus.NOT_START);
RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext,
LogAggregationStatus.RUNNING);
RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext,
LogAggregationStatus.RUNNING_WITH_FAILURE);
RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext,
LogAggregationStatus.TIME_OUT);
RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1);
rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2);
rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3);
rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4);
rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5);
rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6);
rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7);
// appImpl8 is not in the RM
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app4 =
new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app7 =
new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app7);
HadoopArchiveLogs.AppInfo app8 =
new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app8);
Assert.assertEquals(8, hal.eligibleApplications.size());
hal.filterAppsByAggregatedStatus();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertTrue(hal.eligibleApplications.contains(app4));
Assert.assertTrue(hal.eligibleApplications.contains(app7));
Assert.assertTrue(hal.eligibleApplications.contains(app8));
}
}
@Test(timeout = 10000)
public void testGenerateScript() throws Exception {
_testGenerateScript(false);
_testGenerateScript(true);
}
private void _testGenerateScript(boolean proxy) throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
USER));
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
USER));
hal.proxy = proxy;
File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working");
Path remoteRootLogDir = new Path("/tmp", "logs");
String suffix = "logs";
localScript.delete();
Assert.assertFalse(localScript.exists());
hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
Assert.assertTrue(localScript.exists());
String script = IOUtils.toString(localScript.toURI());
String[] lines = script.split(System.lineSeparator());
Assert.assertEquals(16, lines.length);
Assert.assertEquals("#!/bin/bash", lines[0]);
Assert.assertEquals("set -e", lines[1]);
Assert.assertEquals("set -x", lines[2]);
Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
if (lines[4].contains(app1.toString())) {
Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
} else {
Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
}
Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
Assert.assertEquals("else", lines[9]);
Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
Assert.assertEquals("\texit 1", lines[11]);
Assert.assertEquals("fi", lines[12]);
Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
if (proxy) {
Assert.assertEquals(
"\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
remoteRootLogDir.toString() + " -suffix " + suffix,
lines[15]);
} else {
Assert.assertEquals(
"\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
remoteRootLogDir.toString() + " -suffix " + suffix + " -noProxy",
lines[15]);
}
}
/**
* If this test failes, then a new Log Aggregation Status was added. Make
* sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test
* are updated as well, if necessary.
* @throws Exception
*/
@Test(timeout = 5000)
public void testStatuses() throws Exception {
LogAggregationStatus[] statuses = new LogAggregationStatus[7];
statuses[0] = LogAggregationStatus.DISABLED;
statuses[1] = LogAggregationStatus.NOT_START;
statuses[2] = LogAggregationStatus.RUNNING;
statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE;
statuses[4] = LogAggregationStatus.SUCCEEDED;
statuses[5] = LogAggregationStatus.FAILED;
statuses[6] = LogAggregationStatus.TIME_OUT;
Assert.assertArrayEquals(statuses, LogAggregationStatus.values());
}
@Test(timeout = 5000)
public void testPrepareWorkingDir() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf);
Path workingDir = new Path("target", "testPrepareWorkingDir");
fs.delete(workingDir, true);
Assert.assertFalse(fs.exists(workingDir));
// -force is false and the dir doesn't exist so it will create one
hal.force = false;
boolean dirPrepared = hal.prepareWorkingDir(fs, workingDir);
Assert.assertTrue(dirPrepared);
Assert.assertTrue(fs.exists(workingDir));
Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission());
// Throw a file in the dir
Path dummyFile = new Path(workingDir, "dummy.txt");
fs.createNewFile(dummyFile);
Assert.assertTrue(fs.exists(dummyFile));
// -force is false and the dir exists, so nothing will happen and the dummy
// still exists
dirPrepared = hal.prepareWorkingDir(fs, workingDir);
Assert.assertFalse(dirPrepared);
Assert.assertTrue(fs.exists(workingDir));
Assert.assertTrue(fs.exists(dummyFile));
Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission());
// -force is true and the dir exists, so it will recreate it and the dummy
// won't exist anymore
hal.force = true;
dirPrepared = hal.prepareWorkingDir(fs, workingDir);
Assert.assertTrue(dirPrepared);
Assert.assertTrue(fs.exists(workingDir));
Assert.assertEquals(
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
fs.getFileStatus(workingDir).getPermission());
Assert.assertFalse(fs.exists(dummyFile));
}
private static void createFile(FileSystem fs, Path p, long sizeMultiple)
throws IOException {
FSDataOutputStream out = null;
try {
out = fs.create(p);
for (int i = 0 ; i < sizeMultiple; i++) {
out.write(DUMMY_DATA);
}
} finally {
if (out != null) {
out.close();
}
}
Assert.assertTrue(fs.exists(p));
}
private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
final LogAggregationStatus aggStatus) {
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(appId, rmContext, conf, "test",
USER, "default", submissionContext, rmContext.getScheduler(),
rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null) {
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationReport report =
super.createAndGetApplicationReport(clientUserName, allowAccess);
report.setLogAggregationStatus(aggStatus);
return report;
}
};
}
}