| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.collections.map.LRUMap; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.fusesource.leveldbjni.JniDBFactory; |
| import org.iq80.leveldb.DB; |
| import org.iq80.leveldb.DBIterator; |
| import org.iq80.leveldb.Options; |
| import org.iq80.leveldb.WriteBatch; |
| |
| import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong; |
| import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong; |
| |
| /** |
| * An implementation of a timeline store backed by leveldb. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class LeveldbTimelineStore extends AbstractService |
| implements TimelineStore { |
| private static final Log LOG = LogFactory |
| .getLog(LeveldbTimelineStore.class); |
| |
| private static final String FILENAME = "leveldb-timeline-store.ldb"; |
| |
| private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(); |
| private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(); |
| private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(); |
| |
| private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes(); |
| private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(); |
| private static final byte[] RELATED_COLUMN = "r".getBytes(); |
| private static final byte[] TIME_COLUMN = "t".getBytes(); |
| |
| private static final byte[] EMPTY_BYTES = new byte[0]; |
| |
| private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000; |
| private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000; |
| |
| private Map<EntityIdentifier, Long> startTimeWriteCache; |
| private Map<EntityIdentifier, Long> startTimeReadCache; |
| |
| /** |
| * Per-entity locks are obtained when writing. |
| */ |
| private final LockMap<EntityIdentifier> writeLocks = |
| new LockMap<EntityIdentifier>(); |
| |
| private DB db; |
| |
| public LeveldbTimelineStore() { |
| super(LeveldbTimelineStore.class.getName()); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| protected void serviceInit(Configuration conf) throws Exception { |
| Options options = new Options(); |
| options.createIfMissing(true); |
| JniDBFactory factory = new JniDBFactory(); |
| String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH); |
| File p = new File(path); |
| if (!p.exists()) |
| if (!p.mkdirs()) |
| throw new IOException("Couldn't create directory for leveldb " + |
| "timeline store " + path); |
| LOG.info("Using leveldb path " + path); |
| db = factory.open(new File(path, FILENAME), options); |
| startTimeWriteCache = |
| Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( |
| conf))); |
| startTimeReadCache = |
| Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize( |
| conf))); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| IOUtils.cleanup(LOG, db); |
| super.serviceStop(); |
| } |
| |
| private static class LockMap<K> { |
| private static class CountingReentrantLock<K> extends ReentrantLock { |
| private int count; |
| private K key; |
| |
| CountingReentrantLock(K key) { |
| super(); |
| this.count = 0; |
| this.key = key; |
| } |
| } |
| |
| private Map<K, CountingReentrantLock<K>> locks = |
| new HashMap<K, CountingReentrantLock<K>>(); |
| |
| synchronized CountingReentrantLock<K> getLock(K key) { |
| CountingReentrantLock<K> lock = locks.get(key); |
| if (lock == null) { |
| lock = new CountingReentrantLock<K>(key); |
| locks.put(key, lock); |
| } |
| |
| lock.count++; |
| return lock; |
| } |
| |
| synchronized void returnLock(CountingReentrantLock<K> lock) { |
| if (lock.count == 0) { |
| throw new IllegalStateException("Returned lock more times than it " + |
| "was retrieved"); |
| } |
| lock.count--; |
| |
| if (lock.count == 0) { |
| locks.remove(lock.key); |
| } |
| } |
| } |
| |
| private static class KeyBuilder { |
| private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; |
| private byte[][] b; |
| private boolean[] useSeparator; |
| private int index; |
| private int length; |
| |
| public KeyBuilder(int size) { |
| b = new byte[size][]; |
| useSeparator = new boolean[size]; |
| index = 0; |
| length = 0; |
| } |
| |
| public static KeyBuilder newInstance() { |
| return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); |
| } |
| |
| public KeyBuilder add(String s) { |
| return add(s.getBytes(), true); |
| } |
| |
| public KeyBuilder add(byte[] t) { |
| return add(t, false); |
| } |
| |
| public KeyBuilder add(byte[] t, boolean sep) { |
| b[index] = t; |
| useSeparator[index] = sep; |
| length += t.length; |
| if (sep) |
| length++; |
| index++; |
| return this; |
| } |
| |
| public byte[] getBytes() throws IOException { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(length); |
| for (int i = 0; i < index; i++) { |
| baos.write(b[i]); |
| if (i < index-1 && useSeparator[i]) |
| baos.write(0x0); |
| } |
| return baos.toByteArray(); |
| } |
| |
| public byte[] getBytesForLookup() throws IOException { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(length); |
| for (int i = 0; i < index; i++) { |
| baos.write(b[i]); |
| if (useSeparator[i]) |
| baos.write(0x0); |
| } |
| return baos.toByteArray(); |
| } |
| } |
| |
| private static class KeyParser { |
| private final byte[] b; |
| private int offset; |
| |
| public KeyParser(byte[] b, int offset) { |
| this.b = b; |
| this.offset = offset; |
| } |
| |
| public String getNextString() throws IOException { |
| if (offset >= b.length) |
| throw new IOException( |
| "tried to read nonexistent string from byte array"); |
| int i = 0; |
| while (offset+i < b.length && b[offset+i] != 0x0) |
| i++; |
| String s = new String(b, offset, i); |
| offset = offset + i + 1; |
| return s; |
| } |
| |
| public long getNextLong() throws IOException { |
| if (offset+8 >= b.length) |
| throw new IOException("byte array ran out when trying to read long"); |
| long l = readReverseOrderedLong(b, offset); |
| offset += 8; |
| return l; |
| } |
| |
| public int getOffset() { |
| return offset; |
| } |
| } |
| |
| @Override |
| public TimelineEntity getEntity(String entityId, String entityType, |
| EnumSet<Field> fields) throws IOException { |
| DBIterator iterator = null; |
| try { |
| byte[] revStartTime = getStartTime(entityId, entityType); |
| if (revStartTime == null) |
| return null; |
| byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) |
| .add(entityType).add(revStartTime).add(entityId).getBytesForLookup(); |
| |
| iterator = db.iterator(); |
| iterator.seek(prefix); |
| |
| return getEntity(entityId, entityType, |
| readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix, |
| prefix.length); |
| } finally { |
| IOUtils.cleanup(LOG, iterator); |
| } |
| } |
| |
| /** |
| * Read entity from a db iterator. If no information is found in the |
| * specified fields for this entity, return null. |
| */ |
| private static TimelineEntity getEntity(String entityId, String entityType, |
| Long startTime, EnumSet<Field> fields, DBIterator iterator, |
| byte[] prefix, int prefixlen) throws IOException { |
| if (fields == null) |
| fields = EnumSet.allOf(Field.class); |
| |
| TimelineEntity entity = new TimelineEntity(); |
| boolean events = false; |
| boolean lastEvent = false; |
| if (fields.contains(Field.EVENTS)) { |
| events = true; |
| entity.setEvents(new ArrayList<TimelineEvent>()); |
| } else if (fields.contains(Field.LAST_EVENT_ONLY)) { |
| lastEvent = true; |
| entity.setEvents(new ArrayList<TimelineEvent>()); |
| } |
| else { |
| entity.setEvents(null); |
| } |
| boolean relatedEntities = false; |
| if (fields.contains(Field.RELATED_ENTITIES)) { |
| relatedEntities = true; |
| } else { |
| entity.setRelatedEntities(null); |
| } |
| boolean primaryFilters = false; |
| if (fields.contains(Field.PRIMARY_FILTERS)) { |
| primaryFilters = true; |
| } else { |
| entity.setPrimaryFilters(null); |
| } |
| boolean otherInfo = false; |
| if (fields.contains(Field.OTHER_INFO)) { |
| otherInfo = true; |
| entity.setOtherInfo(new HashMap<String, Object>()); |
| } else { |
| entity.setOtherInfo(null); |
| } |
| |
| // iterate through the entity's entry, parsing information if it is part |
| // of a requested field |
| for (; iterator.hasNext(); iterator.next()) { |
| byte[] key = iterator.peekNext().getKey(); |
| if (!prefixMatches(prefix, prefixlen, key)) |
| break; |
| if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) { |
| if (primaryFilters) { |
| addPrimaryFilter(entity, key, |
| prefixlen + PRIMARY_FILTER_COLUMN.length); |
| } |
| } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { |
| if (otherInfo) { |
| entity.addOtherInfo(parseRemainingKey(key, |
| prefixlen + OTHER_INFO_COLUMN.length), |
| GenericObjectMapper.read(iterator.peekNext().getValue())); |
| } |
| } else if (key[prefixlen] == RELATED_COLUMN[0]) { |
| if (relatedEntities) { |
| addRelatedEntity(entity, key, |
| prefixlen + RELATED_COLUMN.length); |
| } |
| } else if (key[prefixlen] == TIME_COLUMN[0]) { |
| if (events || (lastEvent && entity.getEvents().size() == 0)) { |
| TimelineEvent event = getEntityEvent(null, key, prefixlen + |
| TIME_COLUMN.length, iterator.peekNext().getValue()); |
| if (event != null) { |
| entity.addEvent(event); |
| } |
| } |
| } else { |
| LOG.warn(String.format("Found unexpected column for entity %s of " + |
| "type %s (0x%02x)", entityId, entityType, key[prefixlen])); |
| } |
| } |
| |
| entity.setEntityId(entityId); |
| entity.setEntityType(entityType); |
| entity.setStartTime(startTime); |
| |
| return entity; |
| } |
| |
| @Override |
| public TimelineEvents getEntityTimelines(String entityType, |
| SortedSet<String> entityIds, Long limit, Long windowStart, |
| Long windowEnd, Set<String> eventType) throws IOException { |
| TimelineEvents events = new TimelineEvents(); |
| if (entityIds == null || entityIds.isEmpty()) |
| return events; |
| // create a lexicographically-ordered map from start time to entities |
| Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[], |
| List<EntityIdentifier>>(new Comparator<byte[]>() { |
| @Override |
| public int compare(byte[] o1, byte[] o2) { |
| return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, |
| o2.length); |
| } |
| }); |
| DBIterator iterator = null; |
| try { |
| // look up start times for the specified entities |
| // skip entities with no start time |
| for (String entity : entityIds) { |
| byte[] startTime = getStartTime(entity, entityType); |
| if (startTime != null) { |
| List<EntityIdentifier> entities = startTimeMap.get(startTime); |
| if (entities == null) { |
| entities = new ArrayList<EntityIdentifier>(); |
| startTimeMap.put(startTime, entities); |
| } |
| entities.add(new EntityIdentifier(entity, entityType)); |
| } |
| } |
| for (Entry<byte[], List<EntityIdentifier>> entry : |
| startTimeMap.entrySet()) { |
| // look up the events matching the given parameters (limit, |
| // start time, end time, event types) for entities whose start times |
| // were found and add the entities to the return list |
| byte[] revStartTime = entry.getKey(); |
| for (EntityIdentifier entityID : entry.getValue()) { |
| EventsOfOneEntity entity = new EventsOfOneEntity(); |
| entity.setEntityId(entityID.getId()); |
| entity.setEntityType(entityType); |
| events.addEvent(entity); |
| KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) |
| .add(entityType).add(revStartTime).add(entityID.getId()) |
| .add(TIME_COLUMN); |
| byte[] prefix = kb.getBytesForLookup(); |
| if (windowEnd == null) { |
| windowEnd = Long.MAX_VALUE; |
| } |
| byte[] revts = writeReverseOrderedLong(windowEnd); |
| kb.add(revts); |
| byte[] first = kb.getBytesForLookup(); |
| byte[] last = null; |
| if (windowStart != null) { |
| last = KeyBuilder.newInstance().add(prefix) |
| .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); |
| } |
| if (limit == null) { |
| limit = DEFAULT_LIMIT; |
| } |
| iterator = db.iterator(); |
| for (iterator.seek(first); entity.getEvents().size() < limit && |
| iterator.hasNext(); iterator.next()) { |
| byte[] key = iterator.peekNext().getKey(); |
| if (!prefixMatches(prefix, prefix.length, key) || (last != null && |
| WritableComparator.compareBytes(key, 0, key.length, last, 0, |
| last.length) > 0)) |
| break; |
| TimelineEvent event = getEntityEvent(eventType, key, prefix.length, |
| iterator.peekNext().getValue()); |
| if (event != null) |
| entity.addEvent(event); |
| } |
| } |
| } |
| } finally { |
| IOUtils.cleanup(LOG, iterator); |
| } |
| return events; |
| } |
| |
| /** |
| * Returns true if the byte array begins with the specified prefix. |
| */ |
| private static boolean prefixMatches(byte[] prefix, int prefixlen, |
| byte[] b) { |
| if (b.length < prefixlen) |
| return false; |
| return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0, |
| prefixlen) == 0; |
| } |
| |
| @Override |
| public TimelineEntities getEntities(String entityType, |
| Long limit, Long windowStart, Long windowEnd, |
| NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, |
| EnumSet<Field> fields) throws IOException { |
| if (primaryFilter == null) { |
| // if no primary filter is specified, prefix the lookup with |
| // ENTITY_ENTRY_PREFIX |
| return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, |
| windowStart, windowEnd, secondaryFilters, fields); |
| } else { |
| // if a primary filter is specified, prefix the lookup with |
| // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + |
| // ENTITY_ENTRY_PREFIX |
| byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) |
| .add(primaryFilter.getName()) |
| .add(GenericObjectMapper.write(primaryFilter.getValue()), true) |
| .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); |
| return getEntityByTime(base, entityType, limit, windowStart, windowEnd, |
| secondaryFilters, fields); |
| } |
| } |
| |
| /** |
| * Retrieves a list of entities satisfying given parameters. |
| * |
| * @param base A byte array prefix for the lookup |
| * @param entityType The type of the entity |
| * @param limit A limit on the number of entities to return |
| * @param starttime The earliest entity start time to retrieve (exclusive) |
| * @param endtime The latest entity start time to retrieve (inclusive) |
| * @param secondaryFilters Filter pairs that the entities should match |
| * @param fields The set of fields to retrieve |
| * @return A list of entities |
| * @throws IOException |
| */ |
| private TimelineEntities getEntityByTime(byte[] base, |
| String entityType, Long limit, Long starttime, Long endtime, |
| Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) |
| throws IOException { |
| DBIterator iterator = null; |
| try { |
| KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); |
| // only db keys matching the prefix (base + entity type) will be parsed |
| byte[] prefix = kb.getBytesForLookup(); |
| if (endtime == null) { |
| // if end time is null, place no restriction on end time |
| endtime = Long.MAX_VALUE; |
| } |
| // using end time, construct a first key that will be seeked to |
| byte[] revts = writeReverseOrderedLong(endtime); |
| kb.add(revts); |
| byte[] first = kb.getBytesForLookup(); |
| byte[] last = null; |
| if (starttime != null) { |
| // if start time is not null, set a last key that will not be |
| // iterated past |
| last = KeyBuilder.newInstance().add(base).add(entityType) |
| .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); |
| } |
| if (limit == null) { |
| // if limit is not specified, use the default |
| limit = DEFAULT_LIMIT; |
| } |
| |
| TimelineEntities entities = new TimelineEntities(); |
| iterator = db.iterator(); |
| iterator.seek(first); |
| // iterate until one of the following conditions is met: limit is |
| // reached, there are no more keys, the key prefix no longer matches, |
| // or a start time has been specified and reached/exceeded |
| while (entities.getEntities().size() < limit && iterator.hasNext()) { |
| byte[] key = iterator.peekNext().getKey(); |
| if (!prefixMatches(prefix, prefix.length, key) || (last != null && |
| WritableComparator.compareBytes(key, 0, key.length, last, 0, |
| last.length) > 0)) |
| break; |
| // read the start time and entityId from the current key |
| KeyParser kp = new KeyParser(key, prefix.length); |
| Long startTime = kp.getNextLong(); |
| String entityId = kp.getNextString(); |
| // parse the entity that owns this key, iterating over all keys for |
| // the entity |
| TimelineEntity entity = getEntity(entityId, entityType, startTime, |
| fields, iterator, key, kp.getOffset()); |
| if (entity == null) |
| continue; |
| // determine if the retrieved entity matches the provided secondary |
| // filters, and if so add it to the list of entities to return |
| boolean filterPassed = true; |
| if (secondaryFilters != null) { |
| for (NameValuePair filter : secondaryFilters) { |
| Object v = entity.getOtherInfo().get(filter.getName()); |
| if (v == null) { |
| Set<Object> vs = entity.getPrimaryFilters() |
| .get(filter.getName()); |
| if (vs != null && !vs.contains(filter.getValue())) { |
| filterPassed = false; |
| break; |
| } |
| } else if (!v.equals(filter.getValue())) { |
| filterPassed = false; |
| break; |
| } |
| } |
| } |
| if (filterPassed) |
| entities.addEntity(entity); |
| } |
| return entities; |
| } finally { |
| IOUtils.cleanup(LOG, iterator); |
| } |
| } |
| |
| /** |
| * Put a single entity. If there is an error, add a TimelinePutError to the given |
| * response. |
| */ |
| private void put(TimelineEntity entity, TimelinePutResponse response) { |
| LockMap.CountingReentrantLock<EntityIdentifier> lock = |
| writeLocks.getLock(new EntityIdentifier(entity.getEntityId(), |
| entity.getEntityType())); |
| lock.lock(); |
| WriteBatch writeBatch = null; |
| try { |
| writeBatch = db.createWriteBatch(); |
| List<TimelineEvent> events = entity.getEvents(); |
| // look up the start time for the entity |
| byte[] revStartTime = getAndSetStartTime(entity.getEntityId(), |
| entity.getEntityType(), entity.getStartTime(), events, |
| writeBatch); |
| if (revStartTime == null) { |
| // if no start time is found, add an error and return |
| TimelinePutError error = new TimelinePutError(); |
| error.setEntityId(entity.getEntityId()); |
| error.setEntityType(entity.getEntityType()); |
| error.setErrorCode(TimelinePutError.NO_START_TIME); |
| response.addError(error); |
| return; |
| } |
| Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0); |
| Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters(); |
| |
| // write event entries |
| if (events != null && !events.isEmpty()) { |
| for (TimelineEvent event : events) { |
| byte[] revts = writeReverseOrderedLong(event.getTimestamp()); |
| byte[] key = createEntityEventKey(entity.getEntityId(), |
| entity.getEntityType(), revStartTime, revts, |
| event.getEventType()); |
| byte[] value = GenericObjectMapper.write(event.getEventInfo()); |
| writeBatch.put(key, value); |
| writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); |
| } |
| } |
| |
| // write related entity entries |
| Map<String, Set<String>> relatedEntities = |
| entity.getRelatedEntities(); |
| if (relatedEntities != null && !relatedEntities.isEmpty()) { |
| for (Entry<String, Set<String>> relatedEntityList : |
| relatedEntities.entrySet()) { |
| String relatedEntityType = relatedEntityList.getKey(); |
| for (String relatedEntityId : relatedEntityList.getValue()) { |
| // look up start time of related entity |
| byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId, |
| relatedEntityType, null, null, writeBatch); |
| if (relatedEntityStartTime == null) { |
| // if start time is not found, set start time of the related |
| // entity to the start time of this entity, and write it to the |
| // db and the cache |
| relatedEntityStartTime = revStartTime; |
| writeBatch.put(createStartTimeLookupKey(relatedEntityId, |
| relatedEntityType), relatedEntityStartTime); |
| startTimeWriteCache.put(new EntityIdentifier(relatedEntityId, |
| relatedEntityType), revStartTimeLong); |
| } |
| // write reverse entry (related entity -> entity) |
| byte[] key = createReleatedEntityKey(relatedEntityId, |
| relatedEntityType, relatedEntityStartTime, |
| entity.getEntityId(), entity.getEntityType()); |
| writeBatch.put(key, EMPTY_BYTES); |
| // TODO: write forward entry (entity -> related entity)? |
| } |
| } |
| } |
| |
| // write primary filter entries |
| if (primaryFilters != null && !primaryFilters.isEmpty()) { |
| for (Entry<String, Set<Object>> primaryFilter : |
| primaryFilters.entrySet()) { |
| for (Object primaryFilterValue : primaryFilter.getValue()) { |
| byte[] key = createPrimaryFilterKey(entity.getEntityId(), |
| entity.getEntityType(), revStartTime, |
| primaryFilter.getKey(), primaryFilterValue); |
| writeBatch.put(key, EMPTY_BYTES); |
| writePrimaryFilterEntries(writeBatch, primaryFilters, key, |
| EMPTY_BYTES); |
| } |
| } |
| } |
| |
| // write other info entries |
| Map<String, Object> otherInfo = entity.getOtherInfo(); |
| if (otherInfo != null && !otherInfo.isEmpty()) { |
| for (Entry<String, Object> i : otherInfo.entrySet()) { |
| byte[] key = createOtherInfoKey(entity.getEntityId(), |
| entity.getEntityType(), revStartTime, i.getKey()); |
| byte[] value = GenericObjectMapper.write(i.getValue()); |
| writeBatch.put(key, value); |
| writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); |
| } |
| } |
| db.write(writeBatch); |
| } catch (IOException e) { |
| LOG.error("Error putting entity " + entity.getEntityId() + |
| " of type " + entity.getEntityType(), e); |
| TimelinePutError error = new TimelinePutError(); |
| error.setEntityId(entity.getEntityId()); |
| error.setEntityType(entity.getEntityType()); |
| error.setErrorCode(TimelinePutError.IO_EXCEPTION); |
| response.addError(error); |
| } finally { |
| lock.unlock(); |
| writeLocks.returnLock(lock); |
| IOUtils.cleanup(LOG, writeBatch); |
| } |
| } |
| |
| /** |
| * For a given key / value pair that has been written to the db, |
| * write additional entries to the db for each primary filter. |
| */ |
| private static void writePrimaryFilterEntries(WriteBatch writeBatch, |
| Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value) |
| throws IOException { |
| if (primaryFilters != null && !primaryFilters.isEmpty()) { |
| for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) { |
| for (Object pfval : pf.getValue()) { |
| writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, |
| key), value); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public TimelinePutResponse put(TimelineEntities entities) { |
| TimelinePutResponse response = new TimelinePutResponse(); |
| for (TimelineEntity entity : entities.getEntities()) { |
| put(entity, response); |
| } |
| return response; |
| } |
| |
| /** |
| * Get the unique start time for a given entity as a byte array that sorts |
| * the timestamps in reverse order (see {@link |
| * GenericObjectMapper#writeReverseOrderedLong(long)}). |
| * |
| * @param entityId The id of the entity |
| * @param entityType The type of the entity |
| * @return A byte array |
| * @throws IOException |
| */ |
| private byte[] getStartTime(String entityId, String entityType) |
| throws IOException { |
| EntityIdentifier entity = new EntityIdentifier(entityId, entityType); |
| // start time is not provided, so try to look it up |
| if (startTimeReadCache.containsKey(entity)) { |
| // found the start time in the cache |
| return writeReverseOrderedLong(startTimeReadCache.get(entity)); |
| } else { |
| // try to look up the start time in the db |
| byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); |
| byte[] v = db.get(b); |
| if (v == null) { |
| // did not find the start time in the db |
| return null; |
| } else { |
| // found the start time in the db |
| startTimeReadCache.put(entity, readReverseOrderedLong(v, 0)); |
| return v; |
| } |
| } |
| } |
| |
| /** |
| * Get the unique start time for a given entity as a byte array that sorts |
| * the timestamps in reverse order (see {@link |
| * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time |
| * doesn't exist, set it based on the information provided. |
| * |
| * @param entityId The id of the entity |
| * @param entityType The type of the entity |
| * @param startTime The start time of the entity, or null |
| * @param events A list of events for the entity, or null |
| * @param writeBatch A leveldb write batch, if the method is called by a |
| * put as opposed to a get |
| * @return A byte array |
| * @throws IOException |
| */ |
| private byte[] getAndSetStartTime(String entityId, String entityType, |
| Long startTime, List<TimelineEvent> events, WriteBatch writeBatch) |
| throws IOException { |
| EntityIdentifier entity = new EntityIdentifier(entityId, entityType); |
| if (startTime == null) { |
| // start time is not provided, so try to look it up |
| if (startTimeWriteCache.containsKey(entity)) { |
| // found the start time in the cache |
| startTime = startTimeWriteCache.get(entity); |
| return writeReverseOrderedLong(startTime); |
| } else { |
| if (events != null) { |
| // prepare a start time from events in case it is needed |
| Long min = Long.MAX_VALUE; |
| for (TimelineEvent e : events) { |
| if (min > e.getTimestamp()) { |
| min = e.getTimestamp(); |
| } |
| } |
| startTime = min; |
| } |
| return checkStartTimeInDb(entity, startTime, writeBatch); |
| } |
| } else { |
| // start time is provided |
| if (startTimeWriteCache.containsKey(entity)) { |
| // check the provided start time matches the cache |
| if (!startTime.equals(startTimeWriteCache.get(entity))) { |
| // the start time is already in the cache, |
| // and it is different from the provided start time, |
| // so use the one from the cache |
| startTime = startTimeWriteCache.get(entity); |
| } |
| return writeReverseOrderedLong(startTime); |
| } else { |
| // check the provided start time matches the db |
| return checkStartTimeInDb(entity, startTime, writeBatch); |
| } |
| } |
| } |
| |
| /** |
| * Checks db for start time and returns it if it exists. If it doesn't |
| * exist, writes the suggested start time (if it is not null). This is |
| * only called when the start time is not found in the cache, |
| * so it adds it back into the cache if it is found. |
| */ |
| private byte[] checkStartTimeInDb(EntityIdentifier entity, |
| Long suggestedStartTime, WriteBatch writeBatch) throws IOException { |
| // create lookup key for start time |
| byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); |
| // retrieve value for key |
| byte[] v = db.get(b); |
| byte[] revStartTime; |
| if (v == null) { |
| // start time doesn't exist in db |
| if (suggestedStartTime == null) { |
| return null; |
| } |
| // write suggested start time |
| revStartTime = writeReverseOrderedLong(suggestedStartTime); |
| writeBatch.put(b, revStartTime); |
| } else { |
| // found start time in db, so ignore suggested start time |
| suggestedStartTime = readReverseOrderedLong(v, 0); |
| revStartTime = v; |
| } |
| startTimeWriteCache.put(entity, suggestedStartTime); |
| startTimeReadCache.put(entity, suggestedStartTime); |
| return revStartTime; |
| } |
| |
| /** |
| * Creates a key for looking up the start time of a given entity, |
| * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity. |
| */ |
| private static byte[] createStartTimeLookupKey(String entity, |
| String entitytype) throws IOException { |
| return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) |
| .add(entitytype).add(entity).getBytes(); |
| } |
| |
| /** |
| * Creates an index entry for the given key of the form |
| * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. |
| */ |
| private static byte[] addPrimaryFilterToKey(String primaryFilterName, |
| Object primaryFilterValue, byte[] key) throws IOException { |
| return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) |
| .add(primaryFilterName) |
| .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) |
| .getBytes(); |
| } |
| |
| /** |
| * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype + |
| * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype. |
| */ |
| private static byte[] createEntityEventKey(String entity, String entitytype, |
| byte[] revStartTime, byte[] reveventtimestamp, String eventtype) |
| throws IOException { |
| return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) |
| .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN) |
| .add(reveventtimestamp).add(eventtype).getBytes(); |
| } |
| |
| /** |
| * Creates an event object from the given key, offset, and value. If the |
| * event type is not contained in the specified set of event types, |
| * returns null. |
| */ |
| private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key, |
| int offset, byte[] value) throws IOException { |
| KeyParser kp = new KeyParser(key, offset); |
| long ts = kp.getNextLong(); |
| String tstype = kp.getNextString(); |
| if (eventTypes == null || eventTypes.contains(tstype)) { |
| TimelineEvent event = new TimelineEvent(); |
| event.setTimestamp(ts); |
| event.setEventType(tstype); |
| Object o = GenericObjectMapper.read(value); |
| if (o == null) { |
| event.setEventInfo(null); |
| } else if (o instanceof Map) { |
| @SuppressWarnings("unchecked") |
| Map<String, Object> m = (Map<String, Object>) o; |
| event.setEventInfo(m); |
| } else { |
| throw new IOException("Couldn't deserialize event info map"); |
| } |
| return event; |
| } |
| return null; |
| } |
| |
| /** |
| * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + |
| * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value. |
| */ |
| private static byte[] createPrimaryFilterKey(String entity, |
| String entitytype, byte[] revStartTime, String name, Object value) |
| throws IOException { |
| return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) |
| .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name) |
| .add(GenericObjectMapper.write(value)).getBytes(); |
| } |
| |
| /** |
| * Parses the primary filter from the given key at the given offset and |
| * adds it to the given entity. |
| */ |
| private static void addPrimaryFilter(TimelineEntity entity, byte[] key, |
| int offset) throws IOException { |
| KeyParser kp = new KeyParser(key, offset); |
| String name = kp.getNextString(); |
| Object value = GenericObjectMapper.read(key, kp.getOffset()); |
| entity.addPrimaryFilter(name, value); |
| } |
| |
| /** |
| * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype + |
| * revstarttime + entity + OTHER_INFO_COLUMN + name. |
| */ |
| private static byte[] createOtherInfoKey(String entity, String entitytype, |
| byte[] revStartTime, String name) throws IOException { |
| return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) |
| .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name) |
| .getBytes(); |
| } |
| |
| /** |
| * Creates a string representation of the byte array from the given offset |
| * to the end of the array (for parsing other info keys). |
| */ |
| private static String parseRemainingKey(byte[] b, int offset) { |
| return new String(b, offset, b.length - offset); |
| } |
| |
| /** |
| * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + |
| * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype + |
| * relatedentity. |
| */ |
| private static byte[] createReleatedEntityKey(String entity, |
| String entitytype, byte[] revStartTime, String relatedEntity, |
| String relatedEntityType) throws IOException { |
| return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) |
| .add(revStartTime).add(entity).add(RELATED_COLUMN) |
| .add(relatedEntityType).add(relatedEntity).getBytes(); |
| } |
| |
| /** |
| * Parses the related entity from the given key at the given offset and |
| * adds it to the given entity. |
| */ |
| private static void addRelatedEntity(TimelineEntity entity, byte[] key, |
| int offset) throws IOException { |
| KeyParser kp = new KeyParser(key, offset); |
| String type = kp.getNextString(); |
| String id = kp.getNextString(); |
| entity.addRelatedEntity(type, id); |
| } |
| |
| /** |
| * Clears the cache to test reloading start times from leveldb (only for |
| * testing). |
| */ |
| @VisibleForTesting |
| void clearStartTimeCache() { |
| startTimeWriteCache.clear(); |
| startTimeReadCache.clear(); |
| } |
| |
| @VisibleForTesting |
| static int getStartTimeReadCacheSize(Configuration conf) { |
| return conf.getInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, |
| DEFAULT_START_TIME_READ_CACHE_SIZE); |
| } |
| |
| @VisibleForTesting |
| static int getStartTimeWriteCacheSize(Configuration conf) { |
| return conf.getInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, |
| DEFAULT_START_TIME_WRITE_CACHE_SIZE); |
| } |
| } |