blob: de755a721564e288fd497711052571f513be417b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Collections;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestAggregatedLogFormat {
private static final File testWorkDir = new File("target",
"TestAggregatedLogFormat");
private static final Configuration conf = new Configuration();
private static final FileSystem fs;
private static final char filler = 'x';
private static final Log LOG = LogFactory
.getLog(TestAggregatedLogFormat.class);
static {
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Before
@After
public void cleanupTestDir() throws Exception {
Path workDirPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("Cleaning test directory [" + workDirPath + "]");
fs.delete(workDirPath, true);
}
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
@Test
public void testReadAcontainerLogs1() throws Exception {
Configuration conf = new Configuration();
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
ContainerId testContainerId = BuilderUtils.newContainerId(1, 1, 1, 1);
Path t =
new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath = new Path(t, testContainerId.toString());
int numChars = 80000;
writeSrcFile(srcFilePath, "stdout", numChars);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId);
logWriter.append(logKey, logValue);
logWriter.closeWriter();
// make sure permission are correct on the file
FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile);
Assert.assertEquals("permissions on log aggregation file are wrong",
FsPermission.createImmutable((short) 0640), fsStatus.getPermission());
LogReader logReader = new LogReader(conf, remoteAppLogFile);
LogKey rLogKey = new LogKey();
DataInputStream dis = logReader.next(rLogKey);
Writer writer = new StringWriter();
LogReader.readAcontainerLogs(dis, writer);
String s = writer.toString();
int expectedLength =
"\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
+ "\nLog Contents:\n".length() + numChars;
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars));
Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < numChars ; i++) {
sb.append(filler);
}
String expectedContent = sb.toString();
Assert.assertTrue("Log content incorrect", s.contains(expectedContent));
Assert.assertEquals(expectedLength, s.length());
}
private void writeSrcFile(Path srcFilePath, String fileName, long length)
throws IOException {
File dir = new File(srcFilePath.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Unable to create directory : " + dir);
}
}
File outputFile = new File(new File(srcFilePath.toString()), fileName);
FileOutputStream os = new FileOutputStream(outputFile);
OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
int ch = filler;
for (int i = 0; i < length; i++) {
osw.write(ch);
}
osw.close();
}
}