blob: 251b391f400c9f8332994d083db065a2da7d6dba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class AggregatedLogFormat {
static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
public static class LogKey implements Writable {
private String containerId;
public LogKey() {
}
public LogKey(ContainerId containerId) {
this.containerId = ConverterUtils.toString(containerId);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.containerId);
}
@Override
public void readFields(DataInput in) throws IOException {
this.containerId = in.readUTF();
}
@Override
public String toString() {
return this.containerId;
}
}
public static class LogValue {
private final String[] rootLogDirs;
private final ContainerId containerId;
public LogValue(String[] rootLogDirs, ContainerId containerId) {
this.rootLogDirs = rootLogDirs;
this.containerId = containerId;
}
public void write(DataOutputStream out) throws IOException {
for (String rootLogDir : this.rootLogDirs) {
File appLogDir =
new File(rootLogDir, ConverterUtils.toString(this.containerId
.getAppId()));
File containerLogDir =
new File(appLogDir, ConverterUtils.toString(this.containerId));
if (!containerLogDir.isDirectory()) {
continue; // ContainerDir may have been deleted by the user.
}
for (File logFile : containerLogDir.listFiles()) {
// Write the logFile Type
out.writeUTF(logFile.getName());
// Write the log length as UTF so that it is printable
out.writeUTF(String.valueOf(logFile.length()));
// Write the log itself
FileInputStream in = null;
try {
in = new FileInputStream(logFile);
byte[] buf = new byte[65535];
int len = 0;
while ((len = in.read(buf)) != -1) {
out.write(buf, 0, len);
}
} finally {
in.close();
}
}
}
}
}
public static class LogWriter {
private final FSDataOutputStream fsDataOStream;
private final TFile.Writer writer;
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
return FileContext.getFileContext(conf).create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE), new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
LogAggregationService.LOG_COMPRESSION_TYPE,
LogAggregationService.DEFAULT_COMPRESSION_TYPE), null, conf);
}
public void append(LogKey logKey, LogValue logValue) throws IOException {
DataOutputStream out = this.writer.prepareAppendKey(-1);
logKey.write(out);
out.close();
out = this.writer.prepareAppendValue(-1);
logValue.write(out);
out.close();
this.fsDataOStream.hflush();
}
public void closeWriter() {
try {
this.writer.close();
} catch (IOException e) {
LOG.warn("Exception closing writer", e);
}
try {
this.fsDataOStream.close();
} catch (IOException e) {
LOG.warn("Exception closing output-stream", e);
}
}
}
public static class LogReader {
private final FSDataInputStream fsDataIStream;
private final TFile.Reader.Scanner scanner;
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
TFile.Reader reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
private boolean atBeginning = true;
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container.
*
* @param valueStream
* @param out
* @throws IOException
*/
public static void readAContainerLogsForALogType(
DataInputStream valueStream, DataOutputStream out)
throws IOException {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
out.writeUTF("\nLogType:");
out.writeUTF(fileType);
out.writeUTF("\nLogLength:");
out.writeUTF(fileLengthStr);
out.writeUTF("\nLog Contents:\n");
int curRead = 0;
long pendingRead = fileLength - curRead;
int toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) {
out.write(buf, 0, len);
curRead += len;
pendingRead = fileLength - curRead;
toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
}
public void close() throws IOException {
this.scanner.close();
this.fsDataIStream.close();
}
}
}