blob: 6766f8064bc66280b53a802f1fd1acebaf55642d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Writer;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@Public
@Evolving
public class AggregatedLogFormat {
private final static Logger LOG = LoggerFactory.getLogger(
AggregatedLogFormat.class);
private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
private static final LogKey VERSION_KEY = new LogKey("VERSION");
private static final Map<String, LogKey> RESERVED_KEYS;
//Maybe write out the retention policy.
//Maybe write out a list of containerLogs skipped by the retention policy.
private static final int VERSION = 1;
/**
* Umask for the log file.
*/
private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0640 ^ 0777));
static {
RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
}
@Public
public static class LogKey implements Writable {
private String keyString;
public LogKey() {
}
public LogKey(ContainerId containerId) {
this.keyString = containerId.toString();
}
public LogKey(String keyString) {
this.keyString = keyString;
}
@Override
public int hashCode() {
return keyString == null ? 0 : keyString.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof LogKey) {
LogKey other = (LogKey) obj;
if (this.keyString == null) {
return other.keyString == null;
}
return this.keyString.equals(other.keyString);
}
return false;
}
@Private
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.keyString);
}
@Private
@Override
public void readFields(DataInput in) throws IOException {
this.keyString = in.readUTF();
}
@Override
public String toString() {
return this.keyString;
}
}
@Private
public static class LogValue {
private final List<String> rootLogDirs;
private final ContainerId containerId;
private final String user;
private final LogAggregationContext logAggregationContext;
private Set<File> uploadedFiles = new HashSet<File>();
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
private final boolean containerFinished;
/**
* The retention context to determine if log files are older than
* the retention policy configured.
*/
private final LogRetentionContext logRetentionContext;
/**
* The set of log files that are older than retention policy that will
* not be uploaded but ready for deletion.
*/
private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
null, true, true);
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles,
LogRetentionContext retentionContext, boolean appFinished,
boolean containerFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
// Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
this.containerFinished = containerFinished;
this.logRetentionContext = retentionContext;
}
@VisibleForTesting
public Set<File> getPendingLogFilesToUploadForThisContainer() {
Set<File> pendingUploadFiles = new HashSet<File>();
for (String rootLogDir : this.rootLogDirs) {
File appLogDir = new File(rootLogDir,
this.containerId.getApplicationAttemptId().
getApplicationId().toString());
File containerLogDir =
new File(appLogDir, this.containerId.toString());
if (!containerLogDir.isDirectory()) {
continue; // ContainerDir may have been deleted by the user.
}
pendingUploadFiles
.addAll(getPendingLogFilesToUpload(containerLogDir));
}
return pendingUploadFiles;
}
public void write(DataOutputStream out, Set<File> pendingUploadFiles)
throws IOException {
List<File> fileList = new ArrayList<File>(pendingUploadFiles);
Collections.sort(fileList);
for (File logFile : fileList) {
// We only aggregate top level files.
// Ignore anything inside sub-folders.
if (logFile.isDirectory()) {
LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
continue;
}
FileInputStream in = null;
try {
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
IOUtils.cleanupWithLogger(LOG, in);
continue;
}
final long fileLength = logFile.length();
// Write the logFile Type
out.writeUTF(logFile.getName());
// Write the log length as UTF so that it is printable
out.writeUTF(String.valueOf(fileLength));
// Write the log itself
try {
byte[] buf = new byte[65535];
int len = 0;
long bytesLeft = fileLength;
while ((len = in.read(buf)) != -1) {
//If buffer contents within fileLength, write
if (len < bytesLeft) {
out.write(buf, 0, len);
bytesLeft-=len;
}
//else only write contents within fileLength, then exit early
else {
out.write(buf, 0, (int)bytesLeft);
break;
}
}
long newLength = logFile.length();
if(fileLength < newLength) {
LOG.warn("Aggregated logs truncated by approximately "+
(newLength-fileLength) +" bytes.");
}
this.uploadedFiles.add(logFile);
} catch (IOException e) {
String message = logErrorMessage(logFile, e);
out.write(message.getBytes(Charset.forName("UTF-8")));
} finally {
IOUtils.cleanupWithLogger(LOG, in);
}
}
}
@VisibleForTesting
public FileInputStream secureOpenFile(File logFile) throws IOException {
return SecureIOUtils.openForRead(logFile, getUser(), null);
}
private static String logErrorMessage(File logFile, Exception e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + ". " + e.getMessage();
LOG.error(message, e);
return message;
}
// Added for testing purpose.
public String getUser() {
return user;
}
private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
if(containerLogDir == null) {
return new HashSet<>(0);
}
File[] filesList = containerLogDir.listFiles();
if (filesList == null) {
return new HashSet<>(0);
}
Set<File> candidates =
new HashSet<File>(Arrays.asList(filesList));
for (File logFile : candidates) {
this.allExistingFileMeta.add(getLogFileMetaData(logFile));
}
// if log files are older than retention policy, do not upload them.
// but schedule them for deletion.
if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
obseleteRetentionLogFiles.addAll(candidates);
candidates.clear();
return candidates;
}
Set<File> fileCandidates = new HashSet<File>(candidates);
if (this.logAggregationContext != null && candidates.size() > 0) {
fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
if (!this.appFinished && this.containerFinished) {
Set<File> addition = new HashSet<File>(candidates);
addition = getFileCandidates(addition, true);
fileCandidates.addAll(addition);
}
}
return fileCandidates;
}
private Set<File> getFileCandidates(Set<File> candidates,
boolean useRegularPattern) {
filterFiles(
useRegularPattern ? this.logAggregationContext.getIncludePattern()
: this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false);
filterFiles(
useRegularPattern ? this.logAggregationContext.getExcludePattern()
: this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true);
Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() {
@Override
public boolean apply(File next) {
return !alreadyUploadedLogFiles
.contains(getLogFileMetaData(next));
}
});
return Sets.newHashSet(mask);
}
private void filterFiles(String pattern, Set<File> candidates,
boolean exclusion) {
if (pattern != null && !pattern.isEmpty()) {
Pattern filterPattern = Pattern.compile(pattern);
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) {
File candidate = candidatesItr.next();
boolean match = filterPattern.matcher(candidate.getName()).find();
if ((!match && !exclusion) || (match && exclusion)) {
candidatesItr.remove();
}
}
}
}
public Set<Path> getCurrentUpLoadedFilesPath() {
Set<Path> path = new HashSet<Path>();
for (File file : this.uploadedFiles) {
path.add(new Path(file.getAbsolutePath()));
}
return path;
}
public Set<String> getCurrentUpLoadedFileMeta() {
Set<String> info = new HashSet<String>();
for (File file : this.uploadedFiles) {
info.add(getLogFileMetaData(file));
}
return info;
}
public Set<Path> getObseleteRetentionLogFiles() {
Set<Path> path = new HashSet<Path>();
for(File file: this.obseleteRetentionLogFiles) {
path.add(new Path(file.getAbsolutePath()));
}
return path;
}
public Set<String> getAllExistingFilesMeta() {
return this.allExistingFileMeta;
}
private String getLogFileMetaData(File file) {
return containerId.toString() + "_" + file.getName() + "_"
+ file.lastModified();
}
}
/**
* A context for log retention to determine if files are older than
* the retention policy configured in YarnConfiguration.
*/
public static class LogRetentionContext {
/**
* The time used with logRetentionMillis, to determine ages of
* log files and if files need to be uploaded.
*/
private final long logInitedTimeMillis;
/**
* The numbers of milli seconds since a log file is created to determine
* if we should upload it. -1 if disabled.
* see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details.
*/
private final long logRetentionMillis;
public LogRetentionContext(long logInitedTimeMillis, long
logRetentionMillis) {
this.logInitedTimeMillis = logInitedTimeMillis;
this.logRetentionMillis = logRetentionMillis;
}
public boolean isDisabled() {
return logInitedTimeMillis < 0 || logRetentionMillis < 0;
}
public boolean shouldRetainLog() {
return isDisabled() ||
System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis;
}
}
/**
* The writer that writes out the aggregated logs.
*/
@Private
public static class LogWriter implements AutoCloseable {
private FSDataOutputStream fsDataOStream;
private TFile.Writer writer;
private FileContext fc;
/**
* Initialize the LogWriter.
* Must be called just after the instance is created.
* @param conf Configuration
* @param remoteAppLogFile remote log file path
* @param userUgi Ugi of the user
* @throws IOException Failed to initialize
*/
public void initialize(final Configuration conf,
final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
@VisibleForTesting
public TFile.Writer getWriter() {
return this.writer;
}
private void writeVersion() throws IOException {
try (DataOutputStream out = this.writer.prepareAppendKey(-1)) {
VERSION_KEY.write(out);
}
try (DataOutputStream out = this.writer.prepareAppendValue(-1)) {
out.writeInt(VERSION);
}
}
public void writeApplicationOwner(String user) throws IOException {
try (DataOutputStream out = this.writer.prepareAppendKey(-1)) {
APPLICATION_OWNER_KEY.write(out);
}
try (DataOutputStream out = this.writer.prepareAppendValue(-1)) {
out.writeUTF(user);
}
}
public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
throws IOException {
try (DataOutputStream out = this.writer.prepareAppendKey(-1)) {
APPLICATION_ACL_KEY.write(out);
}
try (DataOutputStream out = this.writer.prepareAppendValue(-1)) {
for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
out.writeUTF(entry.getKey().toString());
out.writeUTF(entry.getValue());
}
}
}
public void append(LogKey logKey, LogValue logValue) throws IOException {
Set<File> pendingUploadFiles =
logValue.getPendingLogFilesToUploadForThisContainer();
if (pendingUploadFiles.size() == 0) {
return;
}
try (DataOutputStream out = this.writer.prepareAppendKey(-1)) {
logKey.write(out);
}
try (DataOutputStream out = this.writer.prepareAppendValue(-1)) {
logValue.write(out, pendingUploadFiles);
}
}
@Override
public void close() {
try {
if (writer != null) {
writer.close();
}
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
}
}
}
@Public
@Evolving
public static class LogReader {
private final FSDataInputStream fsDataIStream;
private final TFile.Reader.Scanner scanner;
private final TFile.Reader reader;
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
private boolean atBeginning = true;
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException if we can not get the application owner.
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = null;
try {
ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
} finally {
IOUtils.cleanupWithLogger(LOG, ownerScanner);
}
}
/**
* Returns ACLs for the application. An empty map is returned if no ACLs are
* found.
*
* @return a map of the Application ACLs.
* @throws IOException if we can not get the application acls.
*/
public Map<ApplicationAccessType, String> getApplicationAcls()
throws IOException {
// TODO Seek directly to the key once a comparator is specified.
TFile.Reader.Scanner aclScanner = null;
try {
aclScanner = reader.createScanner();
LogKey key = new LogKey();
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
while (!aclScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = aclScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
while (true) {
String appAccessOp = null;
String aclString = null;
try {
appAccessOp = valueStream.readUTF();
} catch (EOFException e) {
// Valid end of stream.
break;
}
try {
aclString = valueStream.readUTF();
} catch (EOFException e) {
throw new YarnRuntimeException("Error reading ACLs", e);
}
acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
}
}
aclScanner.advance();
}
return acls;
} finally {
IOUtils.cleanupWithLogger(LOG, aclScanner);
}
}
/**
* Read the next key and return the value-stream.
*
* @param key the log key
* @return the valueStream if there are more keys or null otherwise
* @throws IOException if we can not get the dataInputStream
* for the next key
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
/**
* Get a ContainerLogsReader to read the logs for
* the specified container.
*
* @param containerId the containerId
* @return object to read the container's logs or null if the
* logs could not be found
* @throws IOException if we can not get the container log reader.
*/
@Private
public ContainerLogsReader getContainerLogsReader(
ContainerId containerId) throws IOException {
ContainerLogsReader logReader = null;
final LogKey containerKey = new LogKey(containerId);
LogKey key = new LogKey();
DataInputStream valueStream = next(key);
while (valueStream != null && !key.equals(containerKey)) {
valueStream = next(key);
}
if (valueStream != null) {
logReader = new ContainerLogsReader(valueStream);
}
return logReader;
}
//TODO Change Log format and interfaces to be containerId specific.
// Avoid returning completeValueStreams.
// public List<String> getTypesForContainer(DataInputStream valueStream){}
//
// /**
// * @param valueStream
// * The Log stream for the container.
// * @param fileType
// * the log type required.
// * @return An InputStreamReader for the required log type or null if the
// * type is not found.
// * @throws IOException
// */
// public InputStreamReader getLogStreamForType(DataInputStream valueStream,
// String fileType) throws IOException {
// valueStream.reset();
// try {
// while (true) {
// String ft = valueStream.readUTF();
// String fileLengthStr = valueStream.readUTF();
// long fileLength = Long.parseLong(fileLengthStr);
// if (ft.equals(fileType)) {
// BoundedInputStream bis =
// new BoundedInputStream(valueStream, fileLength);
// return new InputStreamReader(bis);
// } else {
// long totalSkipped = 0;
// long currSkipped = 0;
// while (currSkipped != -1 && totalSkipped < fileLength) {
// currSkipped = valueStream.skip(fileLength - totalSkipped);
// totalSkipped += currSkipped;
// }
// // TODO Verify skip behaviour.
// if (currSkipped == -1) {
// return null;
// }
// }
// }
// } catch (EOFException e) {
// return null;
// }
// }
/**
* Writes all logs for a single container to the provided writer.
* @param valueStream the valueStream
* @param writer the log writer
* @param logUploadedTime the time stamp
* @throws IOException if we can not read the container logs.
*/
public static void readAcontainerLogs(DataInputStream valueStream,
Writer writer, long logUploadedTime) throws IOException {
OutputStream os = null;
PrintStream ps = null;
try {
os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
ps = new PrintStream(os);
while (true) {
try {
readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE);
} catch (EOFException e) {
// EndOfFile
return;
}
}
} finally {
IOUtils.cleanupWithLogger(LOG, ps, os);
}
}
/**
* Writes all logs for a single container to the provided writer.
* @param valueStream the value stream
* @param writer the log writer
* @throws IOException if we can not read the container logs.
*/
public static void readAcontainerLogs(DataInputStream valueStream,
Writer writer) throws IOException {
readAcontainerLogs(valueStream, writer, -1);
}
private static void readContainerLogs(DataInputStream valueStream,
PrintStream out, long logUploadedTime, long bytes)
throws IOException {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
out.print("LogType:");
out.println(fileType);
if (logUploadedTime != -1) {
out.print("Log Upload Time:");
out.println(Times.format(logUploadedTime));
}
out.print("LogLength:");
out.println(fileLengthStr);
out.println("Log Contents:");
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, toSkip);
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
out.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
out.println("\nEnd of LogType:" + fileType);
out.println("");
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container.
*
* @param valueStream the value stream
* @param out the print stream
* @param logUploadedTime the time stamp
* @throws IOException if we can not read the container log by specifying
* the container log type.
*/
public static void readAContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime)
throws IOException {
readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container for the specific bytes.
*
* @param valueStream the value stream
* @param out the output print stream
* @param logUploadedTime the log upload time stamp
* @param bytes the output size of the log
* @throws IOException if we can not read the container log
*/
public static void readAContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime,
long bytes) throws IOException {
readContainerLogs(valueStream, out, logUploadedTime, bytes);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container.
*
* @param valueStream the value stream
* @param out the output print stream
* @throws IOException if we can not read the container log
*/
public static void readAContainerLogsForALogType(
DataInputStream valueStream, PrintStream out)
throws IOException {
readAContainerLogsForALogType(valueStream, out, -1);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* the specific types for a single container.
* @param valueStream the value stream
* @param out the output print stream
* @param logUploadedTime the log uploaded time stamp
* @param logType the given log type
* @throws IOException if we can not read the container logs
*/
public static int readContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime,
List<String> logType) throws IOException {
return readContainerLogsForALogType(valueStream, out, logUploadedTime,
logType, Long.MAX_VALUE);
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* the specific types for a single container.
* @param valueStream the value stream
* @param out the output print stream
* @param logUploadedTime the log uploaded time stamp
* @param logType the given log type
* @throws IOException if we can not read the container logs
*/
public static int readContainerLogsForALogType(
DataInputStream valueStream, PrintStream out, long logUploadedTime,
List<String> logType, long bytes) throws IOException {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (logType.contains(fileType)) {
out.print("LogType:");
out.println(fileType);
if (logUploadedTime != -1) {
out.print("Log Upload Time:");
out.println(Times.format(logUploadedTime));
}
out.print("LogLength:");
out.println(fileLengthStr);
out.println("Log Contents:");
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, toSkip);
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
out.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
out.println("\nEnd of LogType:" + fileType);
out.println("");
return 0;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(fileLength - totalSkipped);
totalSkipped += currSkipped;
}
return -1;
}
}
@Private
public static Pair<String, String> readContainerMetaDataAndSkipData(
DataInputStream valueStream) throws IOException {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
Pair<String, String> logMeta = new Pair<String, String>(
fileType, fileLengthStr);
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(fileLength - totalSkipped);
totalSkipped += currSkipped;
}
return logMeta;
}
public void close() {
IOUtils.cleanupWithLogger(LOG, scanner, reader, fsDataIStream);
}
}
@Private
public static class ContainerLogsReader {
private DataInputStream valueStream;
private String currentLogType = null;
private long currentLogLength = 0;
private BoundedInputStream currentLogData = null;
private InputStreamReader currentLogISR;
public ContainerLogsReader(DataInputStream stream) {
valueStream = stream;
}
public String nextLog() throws IOException {
if (currentLogData != null && currentLogLength > 0) {
// seek to the end of the current log, relying on BoundedInputStream
// to prevent seeking past the end of the current log
do {
if (currentLogData.skip(currentLogLength) < 0) {
break;
}
} while (currentLogData.read() != -1);
}
currentLogType = null;
currentLogLength = 0;
currentLogData = null;
currentLogISR = null;
try {
String logType = valueStream.readUTF();
String logLengthStr = valueStream.readUTF();
currentLogLength = Long.parseLong(logLengthStr);
currentLogData =
new BoundedInputStream(valueStream, currentLogLength);
currentLogData.setPropagateClose(false);
currentLogISR = new InputStreamReader(currentLogData,
Charset.forName("UTF-8"));
currentLogType = logType;
} catch (EOFException e) {
}
return currentLogType;
}
public String getCurrentLogType() {
return currentLogType;
}
public long getCurrentLogLength() {
return currentLogLength;
}
public long skip(long n) throws IOException {
return currentLogData.skip(n);
}
public int read() throws IOException {
return currentLogData.read();
}
public int read(byte[] buf, int off, int len) throws IOException {
return currentLogData.read(buf, off, len);
}
public int read(char[] buf, int off, int len) throws IOException {
return currentLogISR.read(buf, off, len);
}
}
}