blob: 86ac1f81107c615552a068d1fc9858b892c89543 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
/**
* In-memory implementation of {@link TimelineStore}. This
* implementation is for test purpose only. If users improperly instantiate it,
* they may encounter reading and writing history data in different memory
* store.
*
*/
@Private
@Unstable
public class MemoryTimelineStore
extends AbstractService implements TimelineStore {
private Map<EntityIdentifier, TimelineEntity> entities =
new HashMap<EntityIdentifier, TimelineEntity>();
private Map<EntityIdentifier, Long> entityInsertTimes =
new HashMap<EntityIdentifier, Long>();
public MemoryTimelineStore() {
super(MemoryTimelineStore.class.getName());
}
@Override
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) {
if (limit == null) {
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
Iterator<TimelineEntity> entityIterator = null;
if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
entityType));
if (firstEntity == null) {
return new TimelineEntities();
} else {
entityIterator = new TreeSet<TimelineEntity>(entities.values())
.tailSet(firstEntity, true).iterator();
}
}
if (entityIterator == null) {
entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
.iterator();
}
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) {
break;
}
if (!entity.getEntityType().equals(entityType)) {
continue;
}
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
entity.getEntityId(), entity.getEntityType())) > fromTs) {
continue;
}
if (primaryFilter != null &&
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !matchPrimaryFilter(
entity.getPrimaryFilters(), secondaryFilter) &&
!matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
if (!flag) {
continue;
}
}
entitiesSelected.add(entity);
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(maskFields(entitySelected, fields));
}
Collections.sort(entitiesToReturn);
TimelineEntities entitiesWrapper = new TimelineEntities();
entitiesWrapper.setEntities(entitiesToReturn);
return entitiesWrapper;
}
@Override
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
}
TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) {
return null;
} else {
return maskFields(entity, fieldsToRetrieve);
}
}
@Override
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd,
Set<String> eventTypes) {
TimelineEvents allEvents = new TimelineEvents();
if (entityIds == null) {
return allEvents;
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
for (String entityId : entityIds) {
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
TimelineEntity entity = entities.get(entityID);
if (entity == null) {
continue;
}
EventsOfOneEntity events = new EventsOfOneEntity();
events.setEntityId(entityId);
events.setEntityType(entityType);
for (TimelineEvent event : entity.getEvents()) {
if (events.getEvents().size() >= limit) {
break;
}
if (event.getTimestamp() <= windowStart) {
continue;
}
if (event.getTimestamp() > windowEnd) {
continue;
}
if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
continue;
}
events.addEvent(event);
}
allEvents.addEvent(events);
}
return allEvents;
}
@Override
public TimelinePutResponse put(TimelineEntities data) {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : data.getEntities()) {
EntityIdentifier entityId =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory
TimelineEntity existingEntity = entities.get(entityId);
if (existingEntity == null) {
existingEntity = new TimelineEntity();
existingEntity.setEntityId(entity.getEntityId());
existingEntity.setEntityType(entity.getEntityType());
existingEntity.setStartTime(entity.getStartTime());
entities.put(entityId, existingEntity);
entityInsertTimes.put(entityId, System.currentTimeMillis());
}
if (entity.getEvents() != null) {
if (existingEntity.getEvents() == null) {
existingEntity.setEvents(entity.getEvents());
} else {
existingEntity.addEvents(entity.getEvents());
}
Collections.sort(existingEntity.getEvents());
}
// check startTime
if (existingEntity.getStartTime() == null) {
if (existingEntity.getEvents() == null
|| existingEntity.getEvents().isEmpty()) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
entities.remove(entityId);
entityInsertTimes.remove(entityId);
continue;
} else {
Long min = Long.MAX_VALUE;
for (TimelineEvent e : entity.getEvents()) {
if (min > e.getTimestamp()) {
min = e.getTimestamp();
}
}
existingEntity.setStartTime(min);
}
}
if (entity.getPrimaryFilters() != null) {
if (existingEntity.getPrimaryFilters() == null) {
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
}
for (Entry<String, Set<Object>> pf :
entity.getPrimaryFilters().entrySet()) {
for (Object pfo : pf.getValue()) {
existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
}
}
}
if (entity.getOtherInfo() != null) {
if (existingEntity.getOtherInfo() == null) {
existingEntity.setOtherInfo(new HashMap<String, Object>());
}
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
existingEntity.addOtherInfo(info.getKey(),
maybeConvert(info.getValue()));
}
}
// relate it to other entities
if (entity.getRelatedEntities() == null) {
continue;
}
for (Map.Entry<String, Set<String>> partRelatedEntities : entity
.getRelatedEntities().entrySet()) {
if (partRelatedEntities == null) {
continue;
}
for (String idStr : partRelatedEntities.getValue()) {
EntityIdentifier relatedEntityId =
new EntityIdentifier(idStr, partRelatedEntities.getKey());
TimelineEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) {
relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId());
} else {
relatedEntity = new TimelineEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
existingEntity.getEntityId());
entities.put(relatedEntityId, relatedEntity);
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
}
}
}
}
return response;
}
private static TimelineEntity maskFields(
TimelineEntity entity, EnumSet<Field> fields) {
// Conceal the fields that are not going to be exposed
TimelineEntity entityToReturn = new TimelineEntity();
entityToReturn.setEntityId(entity.getEntityId());
entityToReturn.setEntityType(entity.getEntityType());
entityToReturn.setStartTime(entity.getStartTime());
entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
Arrays.asList(entity.getEvents().get(0)) : null);
entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
entity.getRelatedEntities() : null);
entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
entity.getPrimaryFilters() : null);
entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
entity.getOtherInfo() : null);
return entityToReturn;
}
private static boolean matchFilter(Map<String, Object> tags,
NameValuePair filter) {
Object value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else if (!value.equals(filter.getValue())) { // doesn't match the filter
return false;
}
return true;
}
private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
NameValuePair filter) {
Set<Object> value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else {
return value.contains(filter.getValue());
}
}
private static Object maybeConvert(Object o) {
if (o instanceof Long) {
Long l = (Long)o;
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
return l.intValue();
}
}
return o;
}
}