blob: e3db1dcc28ac8f166fb0cb9fc1dc19bb6677bfdc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.*;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* <p>An implementation of an application timeline store backed by leveldb.</p>
*
* <p>There are three sections of the db, the start time section,
* the entity section, and the indexed entity section.</p>
*
* <p>The start time section is used to retrieve the unique start time for
* a given entity. Its values each contain a start time while its keys are of
* the form:</p>
* <pre>
* START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
*
* <p>The entity section is ordered by entity type, then entity start time
* descending, then entity ID. There are four sub-sections of the entity
* section: events, primary filters, related entities,
* and other info. The event entries have event info serialized into their
* values. The other info entries have values corresponding to the values of
* the other info name/value map for the entry (note the names are contained
* in the key). All other entries have empty values. The key structure is as
* follows:</p>
* <pre>
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* EVENTS_COLUMN + reveventtimestamp + eventtype
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* PRIMARY_FILTERS_COLUMN + name + value
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* OTHER_INFO_COLUMN + name
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* DOMAIN_ID_COLUMN
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
* relatedentity id</pre>
*
* <p>The indexed entity section contains a primary filter name and primary
* filter value as the prefix. Within a given name/value, entire entity
* entries are stored in the same format as described in the entity section
* above (below, "key" represents any one of the possible entity entry keys
* described above).</p>
* <pre>
* INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
* key</pre>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LeveldbTimelineStore extends AbstractService
implements TimelineStore {
private static final org.slf4j.Logger LOG = LoggerFactory
.getLogger(LeveldbTimelineStore.class);
@Private
@VisibleForTesting
static final String FILENAME = "leveldb-timeline-store.ldb";
@VisibleForTesting
//Extension to FILENAME where backup will be stored in case we need to
//call LevelDb recovery
static final String BACKUP_EXT = ".backup-";
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8"));
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8"));
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8"));
private static final byte[] EVENTS_COLUMN = "e".getBytes(Charset.forName("UTF-8"));
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(Charset.forName("UTF-8"));
private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(Charset.forName("UTF-8"));
private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(Charset.forName("UTF-8"));
private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
"z".getBytes(Charset.forName("UTF-8"));
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(Charset.forName("UTF-8"));
private static final byte[] DOMAIN_ENTRY_PREFIX = "d".getBytes(Charset.forName("UTF-8"));
private static final byte[] OWNER_LOOKUP_PREFIX = "o".getBytes(Charset.forName("UTF-8"));
private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(Charset.forName("UTF-8"));
private static final byte[] OWNER_COLUMN = "o".getBytes(Charset.forName("UTF-8"));
private static final byte[] READER_COLUMN = "r".getBytes(Charset.forName("UTF-8"));
private static final byte[] WRITER_COLUMN = "w".getBytes(Charset.forName("UTF-8"));
private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(Charset.forName("UTF-8"));
private static final byte[] EMPTY_BYTES = new byte[0];
private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0);
@Private
@VisibleForTesting
static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700);
private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
private Map<EntityIdentifier, Long> startTimeReadCache;
/**
* Per-entity locks are obtained when writing.
*/
private final LockMap<EntityIdentifier> writeLocks =
new LockMap<EntityIdentifier>();
private final ReentrantReadWriteLock deleteLock =
new ReentrantReadWriteLock();
private DB db;
private Thread deletionThread;
public LeveldbTimelineStore() {
super(LeveldbTimelineStore.class.getName());
}
private JniDBFactory factory;
@VisibleForTesting
void setFactory(JniDBFactory fact) {
this.factory = fact;
}
@Override
@SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception {
Preconditions.checkArgument(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_TTL_MS);
Preconditions.checkArgument(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
Preconditions.checkArgument(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
"%s property value should be greater than or equal to zero",
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
Preconditions.checkArgument(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
" %s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
Preconditions.checkArgument(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
"%s property value should be greater than zero",
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
Options options = new Options();
options.createIfMissing(true);
options.cacheSize(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
if(factory == null) {
factory = new JniDBFactory();
}
Path dbPath = new Path(
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
FileSystem localFS = null;
try {
localFS = FileSystem.getLocal(conf);
if (!localFS.exists(dbPath)) {
if (!localFS.mkdirs(dbPath)) {
throw new IOException("Couldn't create directory for leveldb " +
"timeline store " + dbPath);
}
localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
}
} finally {
IOUtils.cleanupWithLogger(LOG, localFS);
}
LOG.info("Using leveldb path " + dbPath);
try {
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe) {
File dbFile = new File(dbPath.toString());
File backupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ backupPath, ioe);
FileUtils.copyDirectory(dbFile, backupPath);
LOG.warn("Going to try repair");
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
checkVersion();
startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
conf)));
startTimeReadCache =
Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
conf)));
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
deletionThread = new EntityDeletionThread(conf);
deletionThread.start();
}
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
if (deletionThread != null) {
deletionThread.interrupt();
LOG.info("Waiting for deletion thread to complete its current action");
try {
deletionThread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for deletion thread to complete," +
" closing db now", e);
}
}
IOUtils.cleanupWithLogger(LOG, db);
super.serviceStop();
}
private static class StartAndInsertTime {
final long startTime;
final long insertTime;
public StartAndInsertTime(long startTime, long insertTime) {
this.startTime = startTime;
this.insertTime = insertTime;
}
}
private class EntityDeletionThread extends Thread {
private final long ttl;
private final long ttlInterval;
public EntityDeletionThread(Configuration conf) {
ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
ttlInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
"interval " + ttlInterval);
}
@Override
public void run() {
while (true) {
long timestamp = System.currentTimeMillis() - ttl;
try {
discardOldEntities(timestamp);
Thread.sleep(ttlInterval);
} catch (IOException e) {
LOG.error(e.toString());
} catch (InterruptedException e) {
LOG.info("Deletion thread received interrupt, exiting");
break;
}
}
}
}
private static class LockMap<K> {
private static class CountingReentrantLock<K> extends ReentrantLock {
private static final long serialVersionUID = 1L;
private int count;
private K key;
CountingReentrantLock(K key) {
super();
this.count = 0;
this.key = key;
}
}
private Map<K, CountingReentrantLock<K>> locks =
new HashMap<K, CountingReentrantLock<K>>();
synchronized CountingReentrantLock<K> getLock(K key) {
CountingReentrantLock<K> lock = locks.get(key);
if (lock == null) {
lock = new CountingReentrantLock<K>(key);
locks.put(key, lock);
}
lock.count++;
return lock;
}
synchronized void returnLock(CountingReentrantLock<K> lock) {
if (lock.count == 0) {
throw new IllegalStateException("Returned lock more times than it " +
"was retrieved");
}
lock.count--;
if (lock.count == 0) {
locks.remove(lock.key);
}
}
}
@Override
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fields) throws IOException {
Long revStartTime = getStartTimeLong(entityId, entityType);
if (revStartTime == null) {
return null;
}
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(writeReverseOrderedLong(revStartTime))
.add(entityId).getBytesForLookup();
LeveldbIterator iterator = null;
try {
iterator = new LeveldbIterator(db);
iterator.seek(prefix);
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
return getEntity(entityId, entityType, revStartTime, fields, iterator,
prefix, prefix.length);
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}
/**
* Read entity from a db iterator. If no information is found in the
* specified fields for this entity, return null.
*/
private static TimelineEntity getEntity(String entityId, String entityType,
Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
TimelineEntity entity = new TimelineEntity();
boolean events = false;
boolean lastEvent = false;
if (fields.contains(Field.EVENTS)) {
events = true;
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
lastEvent = true;
} else {
entity.setEvents(null);
}
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
} else {
entity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
} else {
entity.setPrimaryFilters(null);
}
boolean otherInfo = false;
if (fields.contains(Field.OTHER_INFO)) {
otherInfo = true;
} else {
entity.setOtherInfo(null);
}
// iterate through the entity's entry, parsing information if it is part
// of a requested field
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefixlen, key)) {
break;
}
if (key.length == prefixlen) {
continue;
}
if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
if (primaryFilters) {
addPrimaryFilter(entity, key,
prefixlen + PRIMARY_FILTERS_COLUMN.length);
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
entity.addOtherInfo(parseRemainingKey(key,
prefixlen + OTHER_INFO_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
if (relatedEntities) {
addRelatedEntity(entity, key,
prefixlen + RELATED_ENTITIES_COLUMN.length);
}
} else if (key[prefixlen] == EVENTS_COLUMN[0]) {
if (events || (lastEvent &&
entity.getEvents().size() == 0)) {
TimelineEvent event = getEntityEvent(null, key, prefixlen +
EVENTS_COLUMN.length, iterator.peekNext().getValue());
if (event != null) {
entity.addEvent(event);
}
}
} else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
byte[] v = iterator.peekNext().getValue();
String domainId = new String(v, Charset.forName("UTF-8"));
entity.setDomainId(domainId);
} else {
if (key[prefixlen] !=
INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
LOG.warn(String.format("Found unexpected column for entity %s of " +
"type %s (0x%02x)", entityId, entityType, key[prefixlen]));
}
}
}
entity.setEntityId(entityId);
entity.setEntityType(entityType);
entity.setStartTime(startTime);
return entity;
}
@Override
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventType) throws IOException {
TimelineEvents events = new TimelineEvents();
if (entityIds == null || entityIds.isEmpty()) {
return events;
}
// create a lexicographically-ordered map from start time to entities
Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
List<EntityIdentifier>>(new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
o2.length);
}
});
LeveldbIterator iterator = null;
try {
// look up start times for the specified entities
// skip entities with no start time
for (String entityId : entityIds) {
byte[] startTime = getStartTime(entityId, entityType);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
entities = new ArrayList<EntityIdentifier>();
startTimeMap.put(startTime, entities);
}
entities.add(new EntityIdentifier(entityId, entityType));
}
}
for (Entry<byte[], List<EntityIdentifier>> entry :
startTimeMap.entrySet()) {
// look up the events matching the given parameters (limit,
// start time, end time, event types) for entities whose start times
// were found and add the entities to the return list
byte[] revStartTime = entry.getKey();
for (EntityIdentifier entityIdentifier : entry.getValue()) {
EventsOfOneEntity entity = new EventsOfOneEntity();
entity.setEntityId(entityIdentifier.getId());
entity.setEntityType(entityType);
events.addEvent(entity);
KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entityIdentifier.getId())
.add(EVENTS_COLUMN);
byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
byte[] revts = writeReverseOrderedLong(windowEnd);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (windowStart != null) {
last = KeyBuilder.newInstance().add(prefix)
.add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
iterator = new LeveldbIterator(db);
for (iterator.seek(first); entity.getEvents().size() < limit &&
iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0)) {
break;
}
TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
iterator.peekNext().getValue());
if (event != null) {
entity.addEvent(event);
}
}
}
}
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
return events;
}
@Override
public TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
if (primaryFilter == null) {
// if no primary filter is specified, prefix the lookup with
// ENTITY_ENTRY_PREFIX
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
windowStart, windowEnd, fromId, fromTs, secondaryFilters,
fields, checkAcl);
} else {
// if a primary filter is specified, prefix the lookup with
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
// ENTITY_ENTRY_PREFIX
byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilter.getName())
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
fromId, fromTs, secondaryFilters, fields, checkAcl);
}
}
/**
* Retrieves a list of entities satisfying given parameters.
*
* @param base A byte array prefix for the lookup
* @param entityType The type of the entity
* @param limit A limit on the number of entities to return
* @param starttime The earliest entity start time to retrieve (exclusive)
* @param endtime The latest entity start time to retrieve (inclusive)
* @param fromId Retrieve entities starting with this entity
* @param fromTs Ignore entities with insert timestamp later than this ts
* @param secondaryFilters Filter pairs that the entities should match
* @param fields The set of fields to retrieve
* @return A list of entities
* @throws IOException
*/
private TimelineEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
// Even if other info and primary filter fields are not included, we
// still need to load them to match secondary filters when they are
// non-empty
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
boolean addPrimaryFilters = false;
boolean addOtherInfo = false;
if (secondaryFilters != null && secondaryFilters.size() > 0) {
if (!fields.contains(Field.PRIMARY_FILTERS)) {
fields.add(Field.PRIMARY_FILTERS);
addPrimaryFilters = true;
}
if (!fields.contains(Field.OTHER_INFO)) {
fields.add(Field.OTHER_INFO);
addOtherInfo = true;
}
}
LeveldbIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
byte[] prefix = kb.getBytesForLookup();
if (endtime == null) {
// if end time is null, place no restriction on end time
endtime = Long.MAX_VALUE;
}
// construct a first key that will be seeked to using end time or fromId
byte[] first = null;
if (fromId != null) {
Long fromIdStartTime = getStartTimeLong(fromId, entityType);
if (fromIdStartTime == null) {
// no start time for provided id, so return empty entities
return new TimelineEntities();
}
if (fromIdStartTime <= endtime) {
// if provided id's start time falls before the end of the window,
// use it to construct the seek key
first = kb.add(writeReverseOrderedLong(fromIdStartTime))
.add(fromId).getBytesForLookup();
}
}
// if seek key wasn't constructed using fromId, construct it using end ts
if (first == null) {
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
}
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
TimelineEntities entities = new TimelineEntities();
iterator = new LeveldbIterator(db);
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (entities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0)) {
break;
}
// read the start time and entity id from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entityId = kp.getNextString();
if (fromTs != null) {
long insertTime = readReverseOrderedLong(iterator.peekNext()
.getValue(), 0);
if (insertTime > fromTs) {
byte[] firstKey = key;
while (iterator.hasNext() && prefixMatches(firstKey,
kp.getOffset(), key)) {
iterator.next();
key = iterator.peekNext().getKey();
}
continue;
}
}
// parse the entity that owns this key, iterating over all keys for
// the entity
TimelineEntity entity = getEntity(entityId, entityType, startTime,
fields, iterator, key, kp.getOffset());
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = entity.getOtherInfo().get(filter.getName());
if (v == null) {
Set<Object> vs = entity.getPrimaryFilters()
.get(filter.getName());
if (vs == null || !vs.contains(filter.getValue())) {
filterPassed = false;
break;
}
} else if (!v.equals(filter.getValue())) {
filterPassed = false;
break;
}
}
}
if (filterPassed) {
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
// Remove primary filter and other info if they are added for
// matching secondary filters
if (addPrimaryFilters) {
entity.setPrimaryFilters(null);
}
if (addOtherInfo) {
entity.setOtherInfo(null);
}
entities.addEntity(entity);
}
}
}
return entities;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}
/**
* Handle error and set it in response.
*/
private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(errorCode);
response.addError(error);
}
/**
* Put a single entity. If there is an error, add a TimelinePutError to the
* given response.
*/
private void put(TimelineEntity entity, TimelinePutResponse response,
boolean allowEmptyDomainId) {
LockMap.CountingReentrantLock<EntityIdentifier> lock =
writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()));
lock.lock();
WriteBatch writeBatch = null;
List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
new ArrayList<EntityIdentifier>();
byte[] revStartTime = null;
Map<String, Set<Object>> primaryFilters = null;
try {
writeBatch = db.createWriteBatch();
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
StartAndInsertTime startAndInsertTime = getAndSetStartTime(
entity.getEntityId(), entity.getEntityType(),
entity.getStartTime(), events);
if (startAndInsertTime == null) {
// if no start time is found, add an error and return
handleError(entity, response, TimelinePutError.NO_START_TIME);
return;
}
revStartTime = writeReverseOrderedLong(startAndInsertTime
.startTime);
primaryFilters = entity.getPrimaryFilters();
// write entity marker
byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
entity.getEntityType(), revStartTime);
byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
.insertTime);
writeBatch.put(markerKey, markerValue);
writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
markerValue);
// write event entries
if (events != null && !events.isEmpty()) {
for (TimelineEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = createEntityEventKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, revts,
event.getEventType());
byte[] value = GenericObjectMapper.write(event.getEventInfo());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write related entity entries
Map<String, Set<String>> relatedEntities =
entity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, Set<String>> relatedEntityList :
relatedEntities.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// invisible "reverse" entries (entity -> related entity)
byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, relatedEntityId,
relatedEntityType);
writeBatch.put(key, EMPTY_BYTES);
// look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
relatedEntityType);
// delay writing the related entity if no start time is found
if (relatedEntityStartTime == null) {
relatedEntitiesWithoutStartTimes.add(
new EntityIdentifier(relatedEntityId, relatedEntityType));
continue;
} else {
// This is the existing entity
byte[] domainIdBytes = db.get(createDomainIdKey(
relatedEntityId, relatedEntityType, relatedEntityStartTime));
// The timeline data created by the server before 2.6 won't have
// the domain field. We assume this timeline data is in the
// default timeline domain.
String domainId = null;
if (domainIdBytes == null) {
domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
} else {
domainId = new String(domainIdBytes, Charset.forName("UTF-8"));
}
if (!domainId.equals(entity.getDomainId())) {
// in this case the entity will be put, but the relation will be
// ignored
handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
continue;
}
}
// write "forward" entry (related entity -> entity)
key = createRelatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType());
writeBatch.put(key, EMPTY_BYTES);
}
}
}
// write primary filter entries
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
for (Object primaryFilterValue : primaryFilter.getValue()) {
byte[] key = createPrimaryFilterKey(entity.getEntityId(),
entity.getEntityType(), revStartTime,
primaryFilter.getKey(), primaryFilterValue);
writeBatch.put(key, EMPTY_BYTES);
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
EMPTY_BYTES);
}
}
}
// write other info entries
Map<String, Object> otherInfo = entity.getOtherInfo();
if (otherInfo != null && !otherInfo.isEmpty()) {
for (Entry<String, Object> i : otherInfo.entrySet()) {
byte[] key = createOtherInfoKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, i.getKey());
byte[] value = GenericObjectMapper.write(i.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write domain id entry
byte[] key = createDomainIdKey(entity.getEntityId(),
entity.getEntityType(), revStartTime);
if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) {
if (!allowEmptyDomainId) {
handleError(entity, response, TimelinePutError.NO_DOMAIN);
return;
}
} else {
writeBatch.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8")));
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
entity.getDomainId().getBytes(Charset.forName("UTF-8")));
}
db.write(writeBatch);
} catch (DBException de) {
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), de);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), e);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
lock = writeLocks.getLock(relatedEntity);
lock.lock();
try {
StartAndInsertTime relatedEntityStartAndInsertTime =
getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
readReverseOrderedLong(revStartTime, 0), null);
if (relatedEntityStartAndInsertTime == null) {
throw new IOException("Error setting start time for related entity");
}
byte[] relatedEntityStartTime = writeReverseOrderedLong(
relatedEntityStartAndInsertTime.startTime);
// This is the new entity, the domain should be the same
byte[] key = createDomainIdKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime);
db.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8")));
db.put(createRelatedEntityKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
db.put(createEntityMarkerKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime),
writeReverseOrderedLong(relatedEntityStartAndInsertTime
.insertTime));
} catch (DBException de) {
LOG.error("Error putting related entity " + relatedEntity.getId() +
" of type " + relatedEntity.getType() + " for entity " +
entity.getEntityId() + " of type " + entity.getEntityType(), de);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} catch (IOException e) {
LOG.error("Error putting related entity " + relatedEntity.getId() +
" of type " + relatedEntity.getType() + " for entity " +
entity.getEntityId() + " of type " + entity.getEntityType(), e);
handleError(entity, response, TimelinePutError.IO_EXCEPTION);
} finally {
lock.unlock();
writeLocks.returnLock(lock);
}
}
}
/**
* For a given key / value pair that has been written to the db,
* write additional entries to the db for each primary filter.
*/
private static void writePrimaryFilterEntries(WriteBatch writeBatch,
Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
throws IOException {
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
for (Object pfval : pf.getValue()) {
writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
key), value);
}
}
}
}
@Override
public TimelinePutResponse put(TimelineEntities entities) {
try {
deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, false);
}
return response;
} finally {
deleteLock.readLock().unlock();
}
}
@Private
@VisibleForTesting
public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
try {
deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, true);
}
return response;
} finally {
deleteLock.readLock().unlock();
}
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}).
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @return A byte array, null if not found
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType)
throws IOException {
Long l = getStartTimeLong(entityId, entityType);
return l == null ? null : writeReverseOrderedLong(l);
}
/**
* Get the unique start time for a given entity as a Long.
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @return A Long, null if not found
* @throws IOException
*/
private Long getStartTimeLong(String entityId, String entityType)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
try {
// start time is not provided, so try to look it up
if (startTimeReadCache.containsKey(entity)) {
// found the start time in the cache
return startTimeReadCache.get(entity);
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
return null;
} else {
// found the start time in the db
Long l = readReverseOrderedLong(v, 0);
startTimeReadCache.put(entity, l);
return l;
}
}
} catch(DBException e) {
throw new IOException(e);
}
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
* doesn't exist, set it based on the information provided. Should only be
* called when a lock has been obtained on the entity.
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null
* @return A StartAndInsertTime
* @throws IOException
*/
private StartAndInsertTime getAndSetStartTime(String entityId,
String entityType, Long startTime, List<TimelineEvent> events)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
// start time is not provided, so try to look it up
if (startTimeWriteCache.containsKey(entity)) {
// found the start time in the cache
return startTimeWriteCache.get(entity);
} else {
if (events != null) {
// prepare a start time from events in case it is needed
Long min = Long.MAX_VALUE;
for (TimelineEvent e : events) {
if (min > e.getTimestamp()) {
min = e.getTimestamp();
}
}
startTime = min;
}
return checkStartTimeInDb(entity, startTime);
}
} else {
// start time is provided
if (startTimeWriteCache.containsKey(entity)) {
// always use start time from cache if it exists
return startTimeWriteCache.get(entity);
} else {
// check the provided start time matches the db
return checkStartTimeInDb(entity, startTime);
}
}
}
/**
* Checks db for start time and returns it if it exists. If it doesn't
* exist, writes the suggested start time (if it is not null). This is
* only called when the start time is not found in the cache,
* so it adds it back into the cache if it is found. Should only be called
* when a lock has been obtained on the entity.
*/
private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
Long suggestedStartTime) throws IOException {
StartAndInsertTime startAndInsertTime = null;
// create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
try {
// retrieve value for key
byte[] v = db.get(b);
if (v == null) {
// start time doesn't exist in db
if (suggestedStartTime == null) {
return null;
}
startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
System.currentTimeMillis());
// write suggested start time
v = new byte[16];
writeReverseOrderedLong(suggestedStartTime, v, 0);
writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
WriteOptions writeOptions = new WriteOptions();
writeOptions.sync(true);
db.put(b, v, writeOptions);
} else {
// found start time in db, so ignore suggested start time
startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
readReverseOrderedLong(v, 8));
}
} catch(DBException e) {
throw new IOException(e);
}
startTimeWriteCache.put(entity, startAndInsertTime);
startTimeReadCache.put(entity, startAndInsertTime.startTime);
return startAndInsertTime;
}
/**
* Creates a key for looking up the start time of a given entity,
* of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
*/
private static byte[] createStartTimeLookupKey(String entityId,
String entityType) throws IOException {
return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
.add(entityType).add(entityId).getBytes();
}
/**
* Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
* revstarttime + entity id.
*/
private static byte[] createEntityMarkerKey(String entityId,
String entityType, byte[] revStartTime) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
}
/**
* Creates an index entry for the given key of the form
* INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
*/
private static byte[] addPrimaryFilterToKey(String primaryFilterName,
Object primaryFilterValue, byte[] key) throws IOException {
return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilterName)
.add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
.getBytes();
}
/**
* Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
* revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
*/
private static byte[] createEntityEventKey(String entityId,
String entityType, byte[] revStartTime, byte[] revEventTimestamp,
String eventType) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
.add(revEventTimestamp).add(eventType).getBytes();
}
/**
* Creates an event object from the given key, offset, and value. If the
* event type is not contained in the specified set of event types,
* returns null.
*/
private static TimelineEvent getEntityEvent(Set<String> eventTypes,
byte[] key, int offset, byte[] value) throws IOException {
KeyParser kp = new KeyParser(key, offset);
long ts = kp.getNextLong();
String tstype = kp.getNextString();
if (eventTypes == null || eventTypes.contains(tstype)) {
TimelineEvent event = new TimelineEvent();
event.setTimestamp(ts);
event.setEventType(tstype);
Object o = GenericObjectMapper.read(value);
if (o == null) {
event.setEventInfo(null);
} else if (o instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) o;
event.setEventInfo(m);
} else {
throw new IOException("Couldn't deserialize event info map");
}
return event;
}
return null;
}
/**
* Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
* entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
* value.
*/
private static byte[] createPrimaryFilterKey(String entityId,
String entityType, byte[] revStartTime, String name, Object value)
throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
.add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
.add(GenericObjectMapper.write(value)).getBytes();
}
/**
* Parses the primary filter from the given key at the given offset and
* adds it to the given entity.
*/
private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String name = kp.getNextString();
Object value = GenericObjectMapper.read(key, kp.getOffset());
entity.addPrimaryFilter(name, value);
}
/**
* Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
* revstarttime + entity id + OTHER_INFO_COLUMN + name.
*/
private static byte[] createOtherInfoKey(String entityId, String entityType,
byte[] revStartTime, String name) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
.add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
.getBytes();
}
/**
* Creates a string representation of the byte array from the given offset
* to the end of the array (for parsing other info keys).
*/
private static String parseRemainingKey(byte[] b, int offset) {
return new String(b, offset, b.length - offset, Charset.forName("UTF-8"));
}
/**
* Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
* entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
* relatedentity type + relatedentity id.
*/
private static byte[] createRelatedEntityKey(String entityId,
String entityType, byte[] revStartTime, String relatedEntityId,
String relatedEntityType) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
.add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
.add(relatedEntityType).add(relatedEntityId).getBytes();
}
/**
* Parses the related entity from the given key at the given offset and
* adds it to the given entity.
*/
private static void addRelatedEntity(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String type = kp.getNextString();
String id = kp.getNextString();
entity.addRelatedEntity(type, id);
}
/**
* Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
* entity type + revstarttime + entity id +
* INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
* relatedentity type + relatedentity id.
*/
private static byte[] createReverseRelatedEntityKey(String entityId,
String entityType, byte[] revStartTime, String relatedEntityId,
String relatedEntityType) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
.add(revStartTime).add(entityId)
.add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
.add(relatedEntityType).add(relatedEntityId).getBytes();
}
/**
* Creates a domain id key, serializing ENTITY_ENTRY_PREFIX +
* entity type + revstarttime + entity id + DOMAIN_ID_COLUMN.
*/
private static byte[] createDomainIdKey(String entityId,
String entityType, byte[] revStartTime) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
.add(revStartTime).add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
}
/**
* Clears the cache to test reloading start times from leveldb (only for
* testing).
*/
@VisibleForTesting
void clearStartTimeCache() {
startTimeWriteCache.clear();
startTimeReadCache.clear();
}
@VisibleForTesting
static int getStartTimeReadCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
}
@VisibleForTesting
static int getStartTimeWriteCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
}
@VisibleForTesting
List<String> getEntityTypes() throws IOException {
LeveldbIterator iterator = null;
try {
iterator = getDbIterator(false);
List<String> entityTypes = new ArrayList<String>();
iterator.seek(ENTITY_ENTRY_PREFIX);
while (iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
break;
}
KeyParser kp = new KeyParser(key,
ENTITY_ENTRY_PREFIX.length);
String entityType = kp.getNextString();
entityTypes.add(entityType);
byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).getBytesForLookup();
if (lookupKey[lookupKey.length - 1] != 0x0) {
throw new IOException("Found unexpected end byte in lookup key");
}
lookupKey[lookupKey.length - 1] = 0x1;
iterator.seek(lookupKey);
}
return entityTypes;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}
/**
* Finds all keys in the db that have a given prefix and deletes them on
* the given write batch.
*/
private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
LeveldbIterator iterator) {
for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
break;
}
writeBatch.delete(key);
}
}
@VisibleForTesting
boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
throws IOException {
WriteBatch writeBatch = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType);
byte[] typePrefix = kb.getBytesForLookup();
kb.add(reverseTimestamp);
if (!seeked) {
iterator.seek(kb.getBytesForLookup());
}
if (!iterator.hasNext()) {
return false;
}
byte[] entityKey = iterator.peekNext().getKey();
if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
return false;
}
// read the start time and entity id from the current key
KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
String entityId = kp.getNextString();
int prefixlen = kp.getOffset();
byte[] deletePrefix = new byte[prefixlen];
System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
writeBatch = db.createWriteBatch();
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
}
// remove start time from cache and db
writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
EntityIdentifier entityIdentifier =
new EntityIdentifier(entityId, entityType);
startTimeReadCache.remove(entityIdentifier);
startTimeWriteCache.remove(entityIdentifier);
// delete current entity
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(entityKey, prefixlen, key)) {
break;
}
writeBatch.delete(key);
if (key.length == prefixlen) {
continue;
}
if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
kp = new KeyParser(key,
prefixlen + PRIMARY_FILTERS_COLUMN.length);
String name = kp.getNextString();
Object value = GenericObjectMapper.read(key, kp.getOffset());
deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
deletePrefix), pfIterator);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting entity type:" + entityType + " id:" +
entityId + " primary filter entry " + name + " " +
value);
}
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
kp = new KeyParser(key,
prefixlen + RELATED_ENTITIES_COLUMN.length);
String type = kp.getNextString();
String id = kp.getNextString();
byte[] relatedEntityStartTime = getStartTime(id, type);
if (relatedEntityStartTime == null) {
LOG.warn("Found no start time for " +
"related entity " + id + " of type " + type + " while " +
"deleting " + entityId + " of type " + entityType);
continue;
}
writeBatch.delete(createReverseRelatedEntityKey(id, type,
relatedEntityStartTime, entityId, entityType));
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting entity type:" + entityType + " id:" +
entityId + " from invisible reverse related entity " +
"entry of type:" + type + " id:" + id);
}
} else if (key[prefixlen] ==
INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
kp = new KeyParser(key, prefixlen +
INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
String type = kp.getNextString();
String id = kp.getNextString();
byte[] relatedEntityStartTime = getStartTime(id, type);
if (relatedEntityStartTime == null) {
LOG.warn("Found no start time for reverse " +
"related entity " + id + " of type " + type + " while " +
"deleting " + entityId + " of type " + entityType);
continue;
}
writeBatch.delete(createRelatedEntityKey(id, type,
relatedEntityStartTime, entityId, entityType));
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting entity type:" + entityType + " id:" +
entityId + " from related entity entry of type:" +
type + " id:" + id);
}
}
}
WriteOptions writeOptions = new WriteOptions();
writeOptions.sync(true);
db.write(writeBatch, writeOptions);
return true;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
}
/**
* Discards entities with start timestamp less than or equal to the given
* timestamp.
*/
@VisibleForTesting
void discardOldEntities(long timestamp)
throws IOException, InterruptedException {
byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
long totalCount = 0;
long t1 = System.currentTimeMillis();
try {
List<String> entityTypes = getEntityTypes();
for (String entityType : entityTypes) {
LeveldbIterator iterator = null;
LeveldbIterator pfIterator = null;
long typeCount = 0;
try {
deleteLock.writeLock().lock();
iterator = getDbIterator(false);
pfIterator = getDbIterator(false);
if (deletionThread != null && deletionThread.isInterrupted()) {
throw new InterruptedException();
}
boolean seeked = false;
while (deleteNextEntity(entityType, reverseTimestamp, iterator,
pfIterator, seeked)) {
typeCount++;
totalCount++;
seeked = true;
if (deletionThread != null && deletionThread.isInterrupted()) {
throw new InterruptedException();
}
}
} catch (IOException e) {
LOG.error("Got IOException while deleting entities for type " +
entityType + ", continuing to next type", e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator, pfIterator);
deleteLock.writeLock().unlock();
if (typeCount > 0) {
LOG.info("Deleted " + typeCount + " entities of type " +
entityType);
}
}
}
} finally {
long t2 = System.currentTimeMillis();
LOG.info("Discarded " + totalCount + " entities for timestamp " +
timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
}
}
@VisibleForTesting
LeveldbIterator getDbIterator(boolean fillCache) {
ReadOptions readOptions = new ReadOptions();
readOptions.fillCache(fillCache);
return new LeveldbIterator(db, readOptions);
}
Version loadVersion() throws IOException {
try {
byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
// if version is not stored previously, treat it as CURRENT_VERSION_INFO.
if (data == null || data.length == 0) {
return getCurrentVersion();
}
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
} catch(DBException e) {
throw new IOException(e);
}
}
// Only used for test
@VisibleForTesting
void storeVersion(Version state) throws IOException {
dbStoreVersion(state);
}
private void dbStoreVersion(Version state) throws IOException {
String key = TIMELINE_STORE_VERSION_KEY;
byte[] data =
((VersionPBImpl) state).getProto().toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e);
}
}
Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of TS-store is a major upgrade, and any
* compatible change of TS-store is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade timeline store or remove incompatible old state.
*/
private void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded timeline store version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing timeline store version info " + getCurrentVersion());
dbStoreVersion(CURRENT_VERSION_INFO);
} else {
String incompatibleMessage =
"Incompatible version for timeline store: expecting version "
+ getCurrentVersion() + ", but loading version " + loadedVersion;
LOG.error(incompatibleMessage);
throw new IOException(incompatibleMessage);
}
}
//TODO: make data retention work with the domain data as well
@Override
public void put(TimelineDomain domain) throws IOException {
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
if (domain.getId() == null || domain.getId().length() == 0) {
throw new IllegalArgumentException("Domain doesn't have an ID");
}
if (domain.getOwner() == null || domain.getOwner().length() == 0) {
throw new IllegalArgumentException("Domain doesn't have an owner.");
}
// Write description
byte[] domainEntryKey = createDomainEntryKey(
domain.getId(), DESCRIPTION_COLUMN);
byte[] ownerLookupEntryKey = createOwnerLookupKey(
domain.getOwner(), domain.getId(), DESCRIPTION_COLUMN);
if (domain.getDescription() != null) {
writeBatch.put(domainEntryKey, domain.getDescription().
getBytes(Charset.forName("UTF-8")));
writeBatch.put(ownerLookupEntryKey, domain.getDescription().
getBytes(Charset.forName("UTF-8")));
} else {
writeBatch.put(domainEntryKey, EMPTY_BYTES);
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write owner
domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(
domain.getOwner(), domain.getId(), OWNER_COLUMN);
// Null check for owner is done before
writeBatch.put(domainEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8")));
writeBatch.put(ownerLookupEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8")));
// Write readers
domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(
domain.getOwner(), domain.getId(), READER_COLUMN);
if (domain.getReaders() != null && domain.getReaders().length() > 0) {
writeBatch.put(domainEntryKey, domain.getReaders().getBytes(Charset.forName("UTF-8")));
writeBatch.put(ownerLookupEntryKey, domain.getReaders().
getBytes(Charset.forName("UTF-8")));
} else {
writeBatch.put(domainEntryKey, EMPTY_BYTES);
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write writers
domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(
domain.getOwner(), domain.getId(), WRITER_COLUMN);
if (domain.getWriters() != null && domain.getWriters().length() > 0) {
writeBatch.put(domainEntryKey, domain.getWriters().getBytes(Charset.forName("UTF-8")));
writeBatch.put(ownerLookupEntryKey, domain.getWriters().
getBytes(Charset.forName("UTF-8")));
} else {
writeBatch.put(domainEntryKey, EMPTY_BYTES);
writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write creation time and modification time
// We put both timestamps together because they are always retrieved
// together, and store them in the same way as we did for the entity's
// start time and insert time.
domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(
domain.getOwner(), domain.getId(), TIMESTAMP_COLUMN);
long currentTimestamp = System.currentTimeMillis();
byte[] timestamps = db.get(domainEntryKey);
if (timestamps == null) {
timestamps = new byte[16];
writeReverseOrderedLong(currentTimestamp, timestamps, 0);
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
} else {
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
}
writeBatch.put(domainEntryKey, timestamps);
writeBatch.put(ownerLookupEntryKey, timestamps);
db.write(writeBatch);
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
}
/**
* Creates a domain entity key with column name suffix,
* of the form DOMAIN_ENTRY_PREFIX + domain id + column name.
*/
private static byte[] createDomainEntryKey(String domainId,
byte[] columnName) throws IOException {
return KeyBuilder.newInstance().add(DOMAIN_ENTRY_PREFIX)
.add(domainId).add(columnName).getBytes();
}
/**
* Creates an owner lookup key with column name suffix,
* of the form OWNER_LOOKUP_PREFIX + owner + domain id + column name.
*/
private static byte[] createOwnerLookupKey(
String owner, String domainId, byte[] columnName) throws IOException {
return KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
.add(owner).add(domainId).add(columnName).getBytes();
}
@Override
public TimelineDomain getDomain(String domainId)
throws IOException {
LeveldbIterator iterator = null;
try {
byte[] prefix = KeyBuilder.newInstance()
.add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
iterator = new LeveldbIterator(db);
iterator.seek(prefix);
return getTimelineDomain(iterator, domainId, prefix);
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}
@Override
public TimelineDomains getDomains(String owner)
throws IOException {
LeveldbIterator iterator = null;
try {
byte[] prefix = KeyBuilder.newInstance()
.add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
iterator.hasNext();) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
break;
}
// Iterator to parse the rows of an individual domain
KeyParser kp = new KeyParser(key, prefix.length);
String domainId = kp.getNextString();
byte[] prefixExt = KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
.add(owner).add(domainId).getBytesForLookup();
TimelineDomain domainToReturn =
getTimelineDomain(iterator, domainId, prefixExt);
if (domainToReturn != null) {
domains.add(domainToReturn);
}
}
// Sort the domains to return
Collections.sort(domains, new Comparator<TimelineDomain>() {
@Override
public int compare(
TimelineDomain domain1, TimelineDomain domain2) {
int result = domain2.getCreatedTime().compareTo(
domain1.getCreatedTime());
if (result == 0) {
return domain2.getModifiedTime().compareTo(
domain1.getModifiedTime());
} else {
return result;
}
}
});
TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains);
return domainsToReturn;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanupWithLogger(LOG, iterator);
}
}
private static TimelineDomain getTimelineDomain(
LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
// Iterate over all the rows whose key starts with prefix to retrieve the
// domain information.
TimelineDomain domain = new TimelineDomain();
domain.setId(domainId);
boolean noRows = true;
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
break;
}
if (noRows) {
noRows = false;
}
byte[] value = iterator.peekNext().getValue();
if (value != null && value.length > 0) {
if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
domain.setDescription(new String(value, Charset.forName("UTF-8")));
} else if (key[prefix.length] == OWNER_COLUMN[0]) {
domain.setOwner(new String(value, Charset.forName("UTF-8")));
} else if (key[prefix.length] == READER_COLUMN[0]) {
domain.setReaders(new String(value, Charset.forName("UTF-8")));
} else if (key[prefix.length] == WRITER_COLUMN[0]) {
domain.setWriters(new String(value, Charset.forName("UTF-8")));
} else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
domain.setCreatedTime(readReverseOrderedLong(value, 0));
domain.setModifiedTime(readReverseOrderedLong(value, 8));
} else {
LOG.error("Unrecognized domain column: " + key[prefix.length]);
}
}
}
if (noRows) {
return null;
} else {
return domain;
}
}
}