blob: 6260c75f692039f57a418dba37b9cdff4f97c413 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* File System based implementation for TimelineReader. This implementation may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should not
* be used in a non-test situation.
*/
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class);
private FileSystem fs;
private Path rootPath;
private Path entitiesPath;
private static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */
private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@VisibleForTesting
/** Default extension for output files. */
static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
/** Config param for timeline service file system storage root. */
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
/** Default value for storage location on local disk. */
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
private final CSVFormat csvFormat =
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
public FileSystemTimelineReaderImpl() {
super(FileSystemTimelineReaderImpl.class.getName());
}
@VisibleForTesting
String getRootPath() {
return rootPath.toString();
}
private static ObjectMapper mapper;
static {
mapper = new ObjectMapper();
YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
}
/**
* Deserialize a POJO object from a JSON string.
*
* @param <T> Describes the type of class to be returned.
* @param clazz class to be deserialized.
* @param jsonString JSON string to deserialize.
* @return An object based on class type. Used typically for
* <cite>TimelineEntity</cite> object.
* @throws IOException if the underlying input source has problems during
* parsing.
* @throws JsonMappingException if parser has problems parsing content.
* @throws JsonGenerationException if there is a problem in JSON writing.
*/
public static <T> T getTimelineRecordFromJSON(
String jsonString, Class<T> clazz)
throws JsonGenerationException, JsonMappingException, IOException {
return mapper.readValue(jsonString, clazz);
}
private static void fillFields(TimelineEntity finalEntity,
TimelineEntity real, EnumSet<Field> fields) {
if (fields.contains(Field.ALL)) {
fields = EnumSet.allOf(Field.class);
}
for (Field field : fields) {
switch(field) {
case CONFIGS:
finalEntity.setConfigs(real.getConfigs());
break;
case METRICS:
finalEntity.setMetrics(real.getMetrics());
break;
case INFO:
finalEntity.setInfo(real.getInfo());
break;
case IS_RELATED_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case RELATES_TO:
finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
break;
case EVENTS:
finalEntity.setEvents(real.getEvents());
break;
default:
continue;
}
}
}
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
return userId + File.separator + flowName + File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
}
Path clusterIdPath = new Path(entitiesPath, clusterId);
Path appFlowMappingFilePath = new Path(clusterIdPath,
APP_FLOW_MAPPING_FILE);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(
fs.open(appFlowMappingFilePath), Charset.forName("UTF-8")));
CSVParser parser = new CSVParser(reader, csvFormat)) {
for (CSVRecord record : parser.getRecords()) {
if (record.size() < 4) {
continue;
}
String applicationId = record.get("APP");
if (applicationId != null && !applicationId.trim().isEmpty() &&
!applicationId.trim().equals(appId)) {
continue;
}
return record.get(1).trim() + File.separator + record.get(2).trim() +
File.separator + record.get(3).trim();
}
parser.close();
}
throw new IOException("Unable to get flow info");
}
private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
EnumSet<Field> fieldsToRetrieve) {
TimelineEntity entityToBeReturned = new TimelineEntity();
entityToBeReturned.setIdentifier(entity.getIdentifier());
entityToBeReturned.setCreatedTime(entity.getCreatedTime());
if (fieldsToRetrieve != null) {
fillFields(entityToBeReturned, entity, fieldsToRetrieve);
}
return entityToBeReturned;
}
private static boolean isTimeInRange(Long time, Long timeBegin,
Long timeEnd) {
return (time >= timeBegin) && (time <= timeEnd);
}
private static void mergeEntities(TimelineEntity entity1,
TimelineEntity entity2) {
// Ideally created time wont change except in the case of issue from client.
if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) {
entity1.setCreatedTime(entity2.getCreatedTime());
}
for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
entity1.addConfig(configEntry.getKey(), configEntry.getValue());
}
for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
}
for (Entry<String, Set<String>> isRelatedToEntry :
entity2.getIsRelatedToEntities().entrySet()) {
String type = isRelatedToEntry.getKey();
for (String entityId : isRelatedToEntry.getValue()) {
entity1.addIsRelatedToEntity(type, entityId);
}
}
for (Entry<String, Set<String>> relatesToEntry :
entity2.getRelatesToEntities().entrySet()) {
String type = relatesToEntry.getKey();
for (String entityId : relatesToEntry.getValue()) {
entity1.addRelatesToEntity(type, entityId);
}
}
for (TimelineEvent event : entity2.getEvents()) {
entity1.addEvent(event);
}
for (TimelineMetric metric2 : entity2.getMetrics()) {
boolean found = false;
for (TimelineMetric metric1 : entity1.getMetrics()) {
if (metric1.getId().equals(metric2.getId())) {
metric1.addValues(metric2.getValues());
found = true;
break;
}
}
if (!found) {
entity1.addMetric(metric2);
}
}
}
private static TimelineEntity readEntityFromFile(BufferedReader reader)
throws IOException {
TimelineEntity entity =
getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
String entityStr = "";
while ((entityStr = reader.readLine()) != null) {
if (entityStr.trim().isEmpty()) {
continue;
}
TimelineEntity anotherEntity =
getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
if (!entity.getId().equals(anotherEntity.getId()) ||
!entity.getType().equals(anotherEntity.getType())) {
continue;
}
mergeEntities(entity, anotherEntity);
}
return entity;
}
private Set<TimelineEntity> getEntities(Path dir, String entityType,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
// First sort the selected entities based on created/start time.
Map<Long, Set<TimelineEntity>> sortedEntities =
new TreeMap<>(
new Comparator<Long>() {
@Override
public int compare(Long l1, Long l2) {
return l2.compareTo(l1);
}
}
);
if (dir != null) {
RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
false);
if (fileStatuses != null) {
while (fileStatuses.hasNext()) {
LocatedFileStatus locatedFileStatus = fileStatuses.next();
Path entityFile = locatedFileStatus.getPath();
if (!entityFile.getName()
.contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
continue;
}
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(entityFile),
Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
if (!entity.getType().equals(entityType)) {
continue;
}
if (!isTimeInRange(entity.getCreatedTime(),
filters.getCreatedTimeBegin(),
filters.getCreatedTimeEnd())) {
continue;
}
if (filters.getRelatesTo() != null &&
!filters.getRelatesTo().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchRelatesTo(entity,
filters.getRelatesTo())) {
continue;
}
if (filters.getIsRelatedTo() != null &&
!filters.getIsRelatedTo().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchIsRelatedTo(entity,
filters.getIsRelatedTo())) {
continue;
}
if (filters.getInfoFilters() != null &&
!filters.getInfoFilters().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchInfoFilters(entity,
filters.getInfoFilters())) {
continue;
}
if (filters.getConfigFilters() != null &&
!filters.getConfigFilters().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchConfigFilters(entity,
filters.getConfigFilters())) {
continue;
}
if (filters.getMetricFilters() != null &&
!filters.getMetricFilters().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchMetricFilters(entity,
filters.getMetricFilters())) {
continue;
}
if (filters.getEventFilters() != null &&
!filters.getEventFilters().getFilterList().isEmpty() &&
!TimelineStorageUtils.matchEventFilters(entity,
filters.getEventFilters())) {
continue;
}
TimelineEntity entityToBeReturned = createEntityToBeReturned(
entity, dataToRetrieve.getFieldsToRetrieve());
Set<TimelineEntity> entitiesCreatedAtSameTime =
sortedEntities.get(entityToBeReturned.getCreatedTime());
if (entitiesCreatedAtSameTime == null) {
entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
}
entitiesCreatedAtSameTime.add(entityToBeReturned);
sortedEntities.put(entityToBeReturned.getCreatedTime(),
entitiesCreatedAtSameTime);
}
}
}
}
Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
long entitiesAdded = 0;
for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
for (TimelineEntity entity : entitySet) {
entities.add(entity);
++entitiesAdded;
if (entitiesAdded >= filters.getLimit()) {
return entities;
}
}
}
return entities;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
rootPath = new Path(outputRoot);
entitiesPath = new Path(rootPath, ENTITIES_DIR);
fs = rootPath.getFileSystem(conf);
super.serviceInit(conf);
}
@Override
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
Path appIdPath = new Path(flowRunPath, context.getAppId());
Path entityTypePath = new Path(appIdPath, context.getEntityType());
Path entityFilePath = new Path(entityTypePath,
context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(
fs.open(entityFilePath), Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
return createEntityToBeReturned(
entity, dataToRetrieve.getFieldsToRetrieve());
} catch (FileNotFoundException e) {
LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" +
context.getEntityType() + "}. Will send HTTP 404 in response.");
return null;
}
}
@Override
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
Path appIdPath = new Path(flowRunPath, context.getAppId());
Path entityTypePath = new Path(appIdPath, context.getEntityType());
return getEntities(entityTypePath, context.getEntityType(), filters,
dataToRetrieve);
}
@Override public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException {
Set<String> result = new TreeSet<>();
String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
Path appIdPath = new Path(flowRunPath, context.getAppId());
FileStatus[] fileStatuses = fs.listStatus(appIdPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
result.add(fileStatus.getPath().getName());
}
}
return result;
}
}