blob: 96bca8d1f90bf8602fc54b6de3d395fe02ab9d70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.MappingIterator;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
abstract class LogInfo {
public static final String ENTITY_FILE_NAME_DELIMITERS = "_.";
public String getAttemptDirName() {
return attemptDirName;
}
public long getOffset() {
return offset;
}
public void setOffset(long newOffset) {
this.offset = newOffset;
}
private String attemptDirName;
private String filename;
private String user;
private long offset = 0;
private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class);
public LogInfo(String attemptDirName, String file, String owner) {
this.attemptDirName = attemptDirName;
filename = file;
user = owner;
}
public Path getPath(Path rootPath) {
Path attemptPath = new Path(rootPath, attemptDirName);
return new Path(attemptPath, filename);
}
public String getFilename() {
return filename;
}
public boolean matchesGroupId(TimelineEntityGroupId groupId) {
return matchesGroupId(groupId.toString());
}
@InterfaceAudience.Private
@VisibleForTesting
boolean matchesGroupId(String groupId){
// Return true if the group id is a segment (separated by _, ., or end of
// string) of the file name.
int pos = filename.indexOf(groupId);
if (pos < 0) {
return false;
}
return filename.length() == pos + groupId.length()
|| ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf(
filename.charAt(pos + groupId.length())
));
}
public long parseForStore(TimelineDataManager tdm, Path appDirPath,
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
FileSystem fs) throws IOException {
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
attemptDirName);
Path logPath = getPath(appDirPath);
FileStatus status = fs.getFileStatus(logPath);
long numParsed = 0;
if (status != null) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
objMapper, fs);
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException &&
(status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
}
}
} else {
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
}
return numParsed;
}
private long parsePath(TimelineDataManager tdm, Path logPath,
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
FileSystem fs) throws IOException {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(user);
FSDataInputStream in = fs.open(logPath);
JsonParser parser = null;
try {
in.seek(offset);
try {
parser = jsonFactory.createJsonParser(in);
parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted) {
throw e;
} else {
LOG.debug("Exception in parse path: {}", e.getMessage());
return 0;
}
}
return doParse(tdm, parser, objMapper, ugi, appCompleted);
} finally {
IOUtils.closeStream(parser);
IOUtils.closeStream(in);
}
}
protected abstract long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException;
}
class EntityLogInfo extends LogInfo {
private static final Logger LOG = LoggerFactory.getLogger(
EntityGroupFSTimelineStore.class);
public EntityLogInfo(String attemptId,
String file, String owner) {
super(attemptId, file, owner);
}
@Override
protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
TimelineEntities entities = new TimelineEntities();
ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean postError = false;
try {
MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
TimelineEntity.class);
while (iter.hasNext()) {
TimelineEntity entity = iter.next();
String etype = entity.getEntityType();
String eid = entity.getEntityId();
LOG.trace("Read entity {}", etype);
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
try {
LOG.debug("Adding {}({}) to store", eid, etype);
entityList.add(entity);
entities.setEntities(entityList);
TimelinePutResponse response = tdm.postEntities(entities, ugi);
for (TimelinePutResponse.TimelinePutError e
: response.getErrors()) {
LOG.warn("Error putting entity: {} ({}): {}",
e.getEntityId(), e.getEntityType(), e.getErrorCode());
}
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
entityList.clear();
} catch (YarnException e) {
postError = true;
throw new IOException("Error posting entities", e);
} catch (IOException e) {
postError = true;
throw new IOException("Error posting entities", e);
}
}
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted || postError) {
throw e;
}
} catch (RuntimeException e) {
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
throw e;
}
}
return count;
}
}
class DomainLogInfo extends LogInfo {
private static final Logger LOG = LoggerFactory.getLogger(
EntityGroupFSTimelineStore.class);
public DomainLogInfo(String attemptDirName, String file,
String owner) {
super(attemptDirName, file, owner);
}
protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean putError = false;
try {
MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
TimelineDomain.class);
while (iter.hasNext()) {
TimelineDomain domain = iter.next();
domain.setOwner(ugi.getShortUserName());
LOG.trace("Read domain {}", domain.getId());
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
try {
tdm.putDomain(domain, ugi);
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
} catch (YarnException e) {
putError = true;
throw new IOException("Error posting domain", e);
} catch (IOException e) {
putError = true;
throw new IOException("Error posting domain", e);
}
}
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted || putError) {
throw e;
}
} catch (RuntimeException e) {
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
throw e;
}
}
return count;
}
}