blob: 67e5849ed6a925908d3e988d6a974b7f779c7b8c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
/**
* This class wraps over the timeline reader store implementation. It does some
* non trivial manipulation of the timeline data before or after getting
* it from the backend store.
*/
@Private
@Unstable
public class TimelineReaderManager extends AbstractService {
private TimelineReader reader;
public TimelineReaderManager(TimelineReader timelineReader) {
super(TimelineReaderManager.class.getName());
this.reader = timelineReader;
}
/**
* Gets cluster ID from config yarn.resourcemanager.cluster-id
* if not supplied by client.
* @param clusterId
* @param conf
* @return clusterId
*/
private static String getClusterID(String clusterId, Configuration conf) {
if (clusterId == null || clusterId.isEmpty()) {
return conf.get(
YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
}
return clusterId;
}
private static TimelineEntityType getTimelineEntityType(String entityType) {
if (entityType == null) {
return null;
}
try {
return TimelineEntityType.valueOf(entityType);
} catch (IllegalArgumentException e) {
return null;
}
}
/**
* Fill UID in the info field of entity based on the query(identified by
* entity type).
* @param entityType Entity type of query.
* @param entity Timeline Entity.
* @param context Context defining the query.
*/
private static void fillUID(TimelineEntityType entityType,
TimelineEntity entity, TimelineReaderContext context) {
if (entityType != null) {
switch(entityType) {
case YARN_FLOW_ACTIVITY:
FlowActivityEntity activityEntity = (FlowActivityEntity)entity;
context.setUserId(activityEntity.getUser());
context.setFlowName(activityEntity.getFlowName());
entity.setUID(TimelineReaderUtils.UID_KEY,
TimelineUIDConverter.FLOW_UID.encodeUID(context));
return;
case YARN_FLOW_RUN:
FlowRunEntity runEntity = (FlowRunEntity)entity;
context.setFlowRunId(runEntity.getRunId());
entity.setUID(TimelineReaderUtils.UID_KEY,
TimelineUIDConverter.FLOWRUN_UID.encodeUID(context));
return;
case YARN_APPLICATION:
context.setAppId(entity.getId());
entity.setUID(TimelineReaderUtils.UID_KEY,
TimelineUIDConverter.APPLICATION_UID.encodeUID(context));
return;
default:
break;
}
}
context.setEntityType(entity.getType());
context.setEntityIdPrefix(entity.getIdPrefix());
context.setEntityId(entity.getId());
if (context.getDoAsUser() != null) {
entity.setUID(TimelineReaderUtils.UID_KEY,
TimelineUIDConverter.SUB_APPLICATION_ENTITY_UID.encodeUID(context));
} else {
entity.setUID(TimelineReaderUtils.UID_KEY,
TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context));
}
}
/**
* Get a set of entities matching given predicates by making a call to
* backend storage implementation. The meaning of each argument has been
* documented in detail with {@link TimelineReader#getEntities}.If cluster ID
* has not been supplied by the client, fills the cluster id from config
* before making a call to backend storage. After fetching entities from
* backend, fills the appropriate UID based on entity type for each entity.
*
* @param context Timeline context within the scope of which entities have to
* be fetched.
* @param filters Filters which limit the number of entities to be returned.
* @param dataToRetrieve Data to carry in each entity fetched.
* @return a set of <cite>TimelineEntity</cite> objects.
* @throws IOException if any problem occurs while getting entities.
* @see TimelineReader#getEntities
*/
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
context.setClusterId(getClusterID(context.getClusterId(), getConfig()));
Set<TimelineEntity> entities = reader.getEntities(
new TimelineReaderContext(context), filters, dataToRetrieve);
if (entities != null) {
TimelineEntityType type = getTimelineEntityType(context.getEntityType());
for (TimelineEntity entity : entities) {
fillUID(type, entity, context);
}
}
return entities;
}
/**
* Get single timeline entity by making a call to backend storage
* implementation. The meaning of each argument in detail has been
* documented with {@link TimelineReader#getEntity}. If cluster ID has not
* been supplied by the client, fills the cluster id from config before making
* a call to backend storage. After fetching entity from backend, fills the
* appropriate UID based on entity type.
*
* @param context Timeline context within the scope of which entity has to be
* fetched.
* @param dataToRetrieve Data to carry in the entity fetched.
* @return A <cite>TimelineEntity</cite> object if found, null otherwise.
* @throws IOException if any problem occurs while getting entity.
* @see TimelineReader#getEntity
*/
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
context.setClusterId(
getClusterID(context.getClusterId(), getConfig()));
TimelineEntity entity = reader.getEntity(
new TimelineReaderContext(context), dataToRetrieve);
if (entity != null) {
TimelineEntityType type = getTimelineEntityType(context.getEntityType());
fillUID(type, entity, context);
}
return entity;
}
/**
* Gets a list of available timeline entity types for an application. This can
* be done by making a call to the backend storage implementation. The meaning
* of each argument in detail is the same as {@link TimelineReader#getEntity}.
* If cluster ID has not been supplied by the client, fills the cluster id
* from config before making a call to backend storage.
*
* @param context Timeline context within the scope of which entity types
* have to be fetched. Entity type field of this context should
* be null.
* @return A set which contains available timeline entity types, represented
* as strings if found, empty otherwise.
* @throws IOException if any problem occurs while getting entity types.
*/
public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException{
context.setClusterId(getClusterID(context.getClusterId(), getConfig()));
return reader.getEntityTypes(new TimelineReaderContext(context));
}
}