blob: f85445ed68ea4ae07c39046602348d867624d6da [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.TestContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.util.Times;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
public class TestAggregatedLogFormat {
private static final File testWorkDir = new File("target",
"TestAggregatedLogFormat");
private static final Configuration conf = new Configuration();
private static final FileSystem fs;
private static final char filler = 'x';
private static final Log LOG = LogFactory
.getLog(TestAggregatedLogFormat.class);
static {
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Before
@After
public void cleanupTestDir() throws Exception {
Path workDirPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("Cleaning test directory [" + workDirPath + "]");
fs.delete(workDirPath, true);
}
//Test for Corrupted AggregatedLogs. The Logs should not write more data
//if Logvalue.write() is called and the application is still
//appending to logs
@Test
public void testForCorruptedAggregatedLogs() throws Exception {
Configuration conf = new Configuration();
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
Path t =
new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath = new Path(t, testContainerId.toString());
long numChars = 950000;
writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile,
srcFileRoot, testContainerId);
LogReader logReader = new LogReader(conf, remoteAppLogFile);
LogKey rLogKey = new LogKey();
DataInputStream dis = logReader.next(rLogKey);
Writer writer = new StringWriter();
try {
LogReader.readAcontainerLogs(dis, writer);
} catch (Exception e) {
if(e.toString().contains("NumberFormatException")) {
Assert.fail("Aggregated logs are corrupted.");
}
}
}
private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,
Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId)
throws Exception {
File dir = new File(srcFilePath.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Unable to create directory : " + dir);
}
}
File outputFile = new File(new File(srcFilePath.toString()), fileName);
FileOutputStream os = new FileOutputStream(outputFile);
final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
final int ch = filler;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(new Configuration(), remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName()));
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
try {
for (int i = 0; i < length / 3; i++) {
osw.write(ch);
}
latch.countDown();
for (int i = 0; i < (2 * length) / 3; i++) {
osw.write(ch);
}
osw.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t.start();
//Wait till the osw is partially written
//aggregation starts once the ows has completed 1/3rd of its work
latch.await();
//Aggregate The Logs
logWriter.append(logKey, logValue);
}
}
@Test
public void testReadAcontainerLogs1() throws Exception {
//Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime)
testReadAcontainerLog(true);
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
testReadAcontainerLog(false);
}
private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
Configuration conf = new Configuration();
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
Path t =
new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath = new Path(t, testContainerId.toString());
int numChars = 80000;
// create a sub-folder under srcFilePath
// and create file logs in this sub-folder.
// We only aggregate top level files.
// So, this log file should be ignored.
Path subDir = new Path(srcFilePath, "subDir");
fs.mkdirs(subDir);
writeSrcFile(subDir, "logs", numChars);
// create file stderr and stdout in containerLogDir
writeSrcFile(srcFilePath, "stderr", numChars);
writeSrcFile(srcFilePath, "stdout", numChars);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName());
// When we try to open FileInputStream for stderr, it will throw out an
// IOException. Skip the log aggregation for stderr.
LogValue spyLogValue = spy(logValue);
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
doThrow(new IOException("Mock can not open FileInputStream")).when(
spyLogValue).secureOpenFile(errorFile);
logWriter.append(logKey, spyLogValue);
}
// make sure permission are correct on the file
FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile);
Assert.assertEquals("permissions on log aggregation file are wrong",
FsPermission.createImmutable((short) 0640), fsStatus.getPermission());
LogReader logReader = new LogReader(conf, remoteAppLogFile);
LogKey rLogKey = new LogKey();
DataInputStream dis = logReader.next(rLogKey);
Writer writer = new StringWriter();
if (logUploadedTime) {
LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis());
} else {
LogReader.readAcontainerLogs(dis, writer);
}
// We should only do the log aggregation for stdout.
// Since we could not open the fileInputStream for stderr, this file is not
// aggregated.
String s = writer.toString();
int expectedLength = "LogType:stdout".length()
+ (logUploadedTime
? (System.lineSeparator() + "Log Upload Time:"
+ Times.format(System.currentTimeMillis())).length()
: 0)
+ (System.lineSeparator() + "LogLength:" + numChars).length()
+ (System.lineSeparator() + "Log Contents:" + System.lineSeparator())
.length()
+ numChars + ("\n").length() + ("End of LogType:stdout"
+ System.lineSeparator() + System.lineSeparator()).length();
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs"));
Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars));
Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < numChars ; i++) {
sb.append(filler);
}
String expectedContent = sb.toString();
Assert.assertTrue("Log content incorrect", s.contains(expectedContent));
Assert.assertEquals(expectedLength, s.length());
}
@Test(timeout=10000)
public void testContainerLogsFileAccess() throws IOException {
// This test will run only if NativeIO is enabled as SecureIOUtils
// require it to be enabled.
Assume.assumeTrue(NativeIO.isAvailable());
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
File workDir = new File(testWorkDir, "testContainerLogsFileAccess1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
String data = "Log File content for container : ";
// Creating files for container1. Log aggregator will try to read log files
// with illegal user.
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 1);
ContainerId testContainerId1 =
ContainerId.newContainerId(applicationAttemptId, 1);
Path appDir =
new Path(srcFileRoot, testContainerId1.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath1 = new Path(appDir, testContainerId1.toString());
String stdout = "stdout";
String stderr = "stderr";
writeSrcFile(srcFilePath1, stdout, data + testContainerId1.toString()
+ stdout);
writeSrcFile(srcFilePath1, stderr, data + testContainerId1.toString()
+ stderr);
UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId1);
String randomUser = "randomUser";
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId1, randomUser));
// It is trying simulate a situation where first log file is owned by
// different user (probably symlink) and second one by the user itself.
// The first file should not be aggregated. Because this log file has
// the invalid user name.
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
ugi.getShortUserName());
logWriter.append(logKey, logValue);
}
BufferedReader in =
new BufferedReader(new FileReader(new File(remoteAppLogFile
.toUri().getRawPath())));
String line;
StringBuffer sb = new StringBuffer("");
while ((line = in.readLine()) != null) {
LOG.info(line);
sb.append(line);
}
line = sb.toString();
String expectedOwner = ugi.getShortUserName();
if (Path.WINDOWS) {
final String adminsGroupString = "Administrators";
if (Arrays.asList(ugi.getGroupNames()).contains(adminsGroupString)) {
expectedOwner = adminsGroupString;
}
}
// This file: stderr should not be aggregated.
// And we will not aggregate the log message.
String stdoutFile1 =
StringUtils.join(
File.separator,
Arrays.asList(new String[] {
workDir.getAbsolutePath(), "srcFiles",
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stderr }));
// The file: stdout is expected to be aggregated.
String stdoutFile2 =
StringUtils.join(
File.separator,
Arrays.asList(new String[] {
workDir.getAbsolutePath(), "srcFiles",
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stdout }));
String message2 =
"Owner '" + expectedOwner + "' for path "
+ stdoutFile2 + " did not match expected owner '"
+ ugi.getShortUserName() + "'";
Assert.assertFalse(line.contains(message2));
Assert.assertFalse(line.contains(data + testContainerId1.toString()
+ stderr));
Assert.assertTrue(line.contains(data + testContainerId1.toString()
+ stdout));
}
private void writeSrcFile(Path srcFilePath, String fileName, long length)
throws IOException {
OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
int ch = filler;
for (int i = 0; i < length; i++) {
osw.write(ch);
}
osw.close();
}
private void writeSrcFile(Path srcFilePath, String fileName, String data)
throws IOException {
OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
osw.write(data);
osw.close();
}
private OutputStreamWriter getOutputStreamWriter(Path srcFilePath,
String fileName) throws IOException, FileNotFoundException,
UnsupportedEncodingException {
File dir = new File(srcFilePath.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Unable to create directory : " + dir);
}
}
File outputFile = new File(new File(srcFilePath.toString()), fileName);
FileOutputStream os = new FileOutputStream(outputFile);
OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
return osw;
}
}