blob: 1ac170c25056c184a0de83caf47044f90b6f0477 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTClazzNameRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* <p>
* An implementation of an application timeline store backed by leveldb.
* </p>
*
* <p>
* There are three sections of the db, the start time section, the entity
* section, and the indexed entity section.
* </p>
*
* <p>
* The start time section is used to retrieve the unique start time for a given
* entity. Its values each contain a start time while its keys are of the form:
* </p>
*
* <pre>
* START_TIME_LOOKUP_PREFIX + entity type + entity id
* </pre>
*
* <p>
* The entity section is ordered by entity type, then entity start time
* descending, then entity ID. There are four sub-sections of the entity
* section: events, primary filters, related entities, and other info. The event
* entries have event info serialized into their values. The other info entries
* have values corresponding to the values of the other info name/value map for
* the entry (note the names are contained in the key). All other entries have
* empty values. The key structure is as follows:
* </p>
*
* <pre>
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* DOMAIN_ID_COLUMN
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* EVENTS_COLUMN + reveventtimestamp + eventtype
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* PRIMARY_FILTERS_COLUMN + name + value
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* OTHER_INFO_COLUMN + name
*
* ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
* RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
* </pre>
*
* <p>
* The indexed entity section contains a primary filter name and primary filter
* value as the prefix. Within a given name/value, entire entity entries are
* stored in the same format as described in the entity section above (below,
* "key" represents any one of the possible entity entry keys described above).
* </p>
*
* <pre>
* INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
* key
* </pre>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RollingLevelDBTimelineStore extends AbstractService implements
TimelineStore {
private static final Logger LOG = LoggerFactory
.getLogger(RollingLevelDBTimelineStore.class);
private static FSTConfiguration fstConf =
FSTConfiguration.createDefaultConfiguration();
// Fall back to 2.24 parsing if 2.50 parsing fails
private static FSTConfiguration fstConf224 =
FSTConfiguration.createDefaultConfiguration();
// Static class code for 2.24
private static final int LINKED_HASH_MAP_224_CODE = 83;
static {
fstConf.setShareReferences(false);
fstConf224.setShareReferences(false);
// YARN-6654 unable to find class for code 83 (LinkedHashMap)
// The linked hash map was changed between 2.24 and 2.50 so that
// the static code for LinkedHashMap (83) was changed to a dynamic
// code.
FSTClazzNameRegistry registry = fstConf224.getClassRegistry();
registry.registerClass(
LinkedHashMap.class, LINKED_HASH_MAP_224_CODE, fstConf224);
}
@Private
@VisibleForTesting
static final String FILENAME = "leveldb-timeline-store";
static final String DOMAIN = "domain-ldb";
static final String ENTITY = "entity-ldb";
static final String INDEX = "indexes-ldb";
static final String STARTTIME = "starttime-ldb";
static final String OWNER = "owner-ldb";
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);
private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);
private static final byte[] EMPTY_BYTES = new byte[0];
private static final String TIMELINE_STORE_VERSION_KEY =
"timeline-store-version";
private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
private static long writeBatchSize = 10000;
@Private
@VisibleForTesting
static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700);
private Map<EntityIdentifier, Long> startTimeWriteCache;
private Map<EntityIdentifier, Long> startTimeReadCache;
private DB domaindb;
private RollingLevelDB entitydb;
private RollingLevelDB indexdb;
private DB starttimedb;
private DB ownerdb;
private Thread deletionThread;
public RollingLevelDBTimelineStore() {
super(RollingLevelDBTimelineStore.class.getName());
}
@Override
@SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception {
Preconditions
.checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
"%s property value should be greater than zero",
TIMELINE_SERVICE_TTL_MS);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
"%s property value should be greater than zero",
TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
"%s property value should be greater than or equal to zero",
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
" %s property value should be greater than zero",
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
"%s property value should be greater than zero",
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
"%s property value should be greater than zero",
TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
Preconditions.checkArgument(conf.getLong(
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
"%s property value should be greater than zero",
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);
Options options = new Options();
options.createIfMissing(true);
options.cacheSize(conf.getLong(
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
JniDBFactory factory = new JniDBFactory();
Path dbPath = new Path(
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
Path domainDBPath = new Path(dbPath, DOMAIN);
Path starttimeDBPath = new Path(dbPath, STARTTIME);
Path ownerDBPath = new Path(dbPath, OWNER);
try (FileSystem localFS = FileSystem.getLocal(conf)) {
if (!localFS.exists(dbPath)) {
if (!localFS.mkdirs(dbPath)) {
throw new IOException("Couldn't create directory for leveldb "
+ "timeline store " + dbPath);
}
localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
}
if (!localFS.exists(domainDBPath)) {
if (!localFS.mkdirs(domainDBPath)) {
throw new IOException("Couldn't create directory for leveldb "
+ "timeline store " + domainDBPath);
}
localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
}
if (!localFS.exists(starttimeDBPath)) {
if (!localFS.mkdirs(starttimeDBPath)) {
throw new IOException("Couldn't create directory for leveldb "
+ "timeline store " + starttimeDBPath);
}
localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
}
if (!localFS.exists(ownerDBPath)) {
if (!localFS.mkdirs(ownerDBPath)) {
throw new IOException("Couldn't create directory for leveldb "
+ "timeline store " + ownerDBPath);
}
localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
}
}
options.maxOpenFiles(conf.getInt(
TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
options.writeBufferSize(conf.getInt(
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
LOG.info("Using leveldb path " + dbPath);
domaindb = factory.open(new File(domainDBPath.toString()), options);
entitydb = new RollingLevelDB(ENTITY);
entitydb.init(conf);
indexdb = new RollingLevelDB(INDEX);
indexdb.init(conf);
starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
ownerdb = factory.open(new File(ownerDBPath.toString()), options);
checkVersion();
startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
getStartTimeWriteCacheSize(conf)));
startTimeReadCache = Collections.synchronizedMap(new LRUMap(
getStartTimeReadCacheSize(conf)));
writeBatchSize = conf.getInt(
TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
deletionThread = new EntityDeletionThread(getConfig());
deletionThread.start();
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (deletionThread != null) {
deletionThread.interrupt();
LOG.info("Waiting for deletion thread to complete its current action");
try {
deletionThread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for deletion thread to complete,"
+ " closing db now", e);
}
}
IOUtils.cleanupWithLogger(LOG, domaindb);
IOUtils.cleanupWithLogger(LOG, starttimedb);
IOUtils.cleanupWithLogger(LOG, ownerdb);
entitydb.stop();
indexdb.stop();
super.serviceStop();
}
private class EntityDeletionThread extends Thread {
private final long ttl;
private final long ttlInterval;
EntityDeletionThread(Configuration conf) {
ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
DEFAULT_TIMELINE_SERVICE_TTL_MS);
ttlInterval = conf.getLong(
TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
+ "interval " + ttlInterval);
}
@Override
public void run() {
Thread.currentThread().setName("Leveldb Timeline Store Retention");
while (true) {
long timestamp = System.currentTimeMillis() - ttl;
try {
discardOldEntities(timestamp);
Thread.sleep(ttlInterval);
} catch (IOException e) {
LOG.error(e.toString());
} catch (InterruptedException e) {
LOG.info("Deletion thread received interrupt, exiting");
break;
}
}
}
}
@Override
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fields) throws IOException {
Long revStartTime = getStartTimeLong(entityId, entityType);
if (revStartTime == null) {
return null;
}
byte[] prefix = KeyBuilder.newInstance().add(entityType)
.add(writeReverseOrderedLong(revStartTime)).add(entityId)
.getBytesForLookup();
DB db = entitydb.getDBForStartTime(revStartTime);
if (db == null) {
return null;
}
try (DBIterator iterator = db.iterator()) {
iterator.seek(prefix);
return getEntity(entityId, entityType, revStartTime, fields, iterator,
prefix, prefix.length);
}
}
/**
* Read entity from a db iterator. If no information is found in the specified
* fields for this entity, return null.
*/
private static TimelineEntity getEntity(String entityId, String entityType,
Long startTime, EnumSet<Field> fields, DBIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
TimelineEntity entity = new TimelineEntity();
boolean events = false;
boolean lastEvent = false;
if (fields.contains(Field.EVENTS)) {
events = true;
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
lastEvent = true;
} else {
entity.setEvents(null);
}
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
} else {
entity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
} else {
entity.setPrimaryFilters(null);
}
boolean otherInfo = false;
if (fields.contains(Field.OTHER_INFO)) {
otherInfo = true;
} else {
entity.setOtherInfo(null);
}
// iterate through the entity's entry, parsing information if it is part
// of a requested field
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefixlen, key)) {
break;
}
if (key.length == prefixlen) {
continue;
}
if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
if (primaryFilters) {
addPrimaryFilter(entity, key, prefixlen
+ PRIMARY_FILTERS_COLUMN.length);
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
Object o = null;
String keyStr = parseRemainingKey(key,
prefixlen + OTHER_INFO_COLUMN.length);
try {
o = fstConf.asObject(iterator.peekNext().getValue());
entity.addOtherInfo(keyStr, o);
} catch (Exception ignore) {
try {
// Fall back to 2.24 parser
o = fstConf224.asObject(iterator.peekNext().getValue());
entity.addOtherInfo(keyStr, o);
} catch (Exception e) {
LOG.warn("Error while decoding "
+ entityId + ":otherInfo:" + keyStr, e);
}
}
}
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
if (relatedEntities) {
addRelatedEntity(entity, key, prefixlen
+ RELATED_ENTITIES_COLUMN.length);
}
} else if (key[prefixlen] == EVENTS_COLUMN[0]) {
if (events || (lastEvent && entity.getEvents().size() == 0)) {
TimelineEvent event = getEntityEvent(null, key, prefixlen
+ EVENTS_COLUMN.length, iterator.peekNext().getValue());
if (event != null) {
entity.addEvent(event);
}
}
} else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
byte[] v = iterator.peekNext().getValue();
String domainId = new String(v, UTF_8);
entity.setDomainId(domainId);
} else {
LOG.warn(String.format("Found unexpected column for entity %s of "
+ "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
}
}
entity.setEntityId(entityId);
entity.setEntityType(entityType);
entity.setStartTime(startTime);
return entity;
}
@Override
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventType) throws IOException {
TimelineEvents events = new TimelineEvents();
if (entityIds == null || entityIds.isEmpty()) {
return events;
}
// create a lexicographically-ordered map from start time to entities
Map<byte[], List<EntityIdentifier>> startTimeMap =
new TreeMap<byte[], List<EntityIdentifier>>(
new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
o2.length);
}
});
// look up start times for the specified entities
// skip entities with no start time
for (String entityId : entityIds) {
byte[] startTime = getStartTime(entityId, entityType);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
entities = new ArrayList<EntityIdentifier>();
startTimeMap.put(startTime, entities);
}
entities.add(new EntityIdentifier(entityId, entityType));
}
}
for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
.entrySet()) {
// look up the events matching the given parameters (limit,
// start time, end time, event types) for entities whose start times
// were found and add the entities to the return list
byte[] revStartTime = entry.getKey();
for (EntityIdentifier entityIdentifier : entry.getValue()) {
EventsOfOneEntity entity = new EventsOfOneEntity();
entity.setEntityId(entityIdentifier.getId());
entity.setEntityType(entityType);
events.addEvent(entity);
KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
.add(revStartTime).add(entityIdentifier.getId())
.add(EVENTS_COLUMN);
byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
byte[] revts = writeReverseOrderedLong(windowEnd);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (windowStart != null) {
last = KeyBuilder.newInstance().add(prefix)
.add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
revStartTime, 0));
if (db == null) {
continue;
}
try (DBIterator iterator = db.iterator()) {
for (iterator.seek(first); entity.getEvents().size() < limit
&& iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)
|| (last != null && WritableComparator.compareBytes(key, 0,
key.length, last, 0, last.length) > 0)) {
break;
}
TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
iterator.peekNext().getValue());
if (event != null) {
entity.addEvent(event);
}
}
}
}
}
return events;
}
@Override
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
if (primaryFilter == null) {
// if no primary filter is specified, prefix the lookup with
// ENTITY_ENTRY_PREFIX
return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
} else {
// if a primary filter is specified, prefix the lookup with
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
// ENTITY_ENTRY_PREFIX
byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
.add(fstConf.asByteArray(primaryFilter.getValue()), true)
.getBytesForLookup();
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
fromId, fromTs, secondaryFilters, fields, checkAcl, true);
}
}
/**
* Retrieves a list of entities satisfying given parameters.
*
* @param base
* A byte array prefix for the lookup
* @param entityType
* The type of the entity
* @param limit
* A limit on the number of entities to return
* @param starttime
* The earliest entity start time to retrieve (exclusive)
* @param endtime
* The latest entity start time to retrieve (inclusive)
* @param fromId
* Retrieve entities starting with this entity
* @param fromTs
* Ignore entities with insert timestamp later than this ts
* @param secondaryFilters
* Filter pairs that the entities should match
* @param fields
* The set of fields to retrieve
* @param usingPrimaryFilter
* true if this query is using a primary filter
* @return A list of entities
* @throws IOException
*/
private TimelineEntities getEntityByTime(byte[] base, String entityType,
Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
byte[] prefix = kb.getBytesForLookup();
if (endtime == null) {
// if end time is null, place no restriction on end time
endtime = Long.MAX_VALUE;
}
// Sanitize the fields parameter
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
// construct a first key that will be seeked to using end time or fromId
long firstStartTime = Long.MAX_VALUE;
byte[] first = null;
if (fromId != null) {
Long fromIdStartTime = getStartTimeLong(fromId, entityType);
if (fromIdStartTime == null) {
// no start time for provided id, so return empty entities
return new TimelineEntities();
}
if (fromIdStartTime <= endtime) {
// if provided id's start time falls before the end of the window,
// use it to construct the seek key
firstStartTime = fromIdStartTime;
first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
.getBytesForLookup();
}
}
// if seek key wasn't constructed using fromId, construct it using end ts
if (first == null) {
firstStartTime = endtime;
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
}
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
TimelineEntities entities = new TimelineEntities();
RollingLevelDB rollingdb = null;
if (usingPrimaryFilter) {
rollingdb = indexdb;
} else {
rollingdb = entitydb;
}
DB db = rollingdb.getDBForStartTime(firstStartTime);
while (entities.getEntities().size() < limit && db != null) {
try (DBIterator iterator = db.iterator()) {
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (entities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)
|| (last != null && WritableComparator.compareBytes(key, 0,
key.length, last, 0, last.length) > 0)) {
break;
}
// read the start time and entity id from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entityId = kp.getNextString();
if (fromTs != null) {
long insertTime = readReverseOrderedLong(iterator.peekNext()
.getValue(), 0);
if (insertTime > fromTs) {
byte[] firstKey = key;
while (iterator.hasNext()) {
key = iterator.peekNext().getKey();
iterator.next();
if (!prefixMatches(firstKey, kp.getOffset(), key)) {
break;
}
}
continue;
}
}
// Even if other info and primary filter fields are not included, we
// still need to load them to match secondary filters when they are
// non-empty
EnumSet<Field> queryFields = EnumSet.copyOf(fields);
boolean addPrimaryFilters = false;
boolean addOtherInfo = false;
if (secondaryFilters != null && secondaryFilters.size() > 0) {
if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
queryFields.add(Field.PRIMARY_FILTERS);
addPrimaryFilters = true;
}
if (!queryFields.contains(Field.OTHER_INFO)) {
queryFields.add(Field.OTHER_INFO);
addOtherInfo = true;
}
}
// parse the entity that owns this key, iterating over all keys for
// the entity
TimelineEntity entity = null;
if (usingPrimaryFilter) {
entity = getEntity(entityId, entityType, queryFields);
iterator.next();
} else {
entity = getEntity(entityId, entityType, startTime, queryFields,
iterator, key, kp.getOffset());
}
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = entity.getOtherInfo().get(filter.getName());
if (v == null) {
Set<Object> vs = entity.getPrimaryFilters()
.get(filter.getName());
if (vs == null || !vs.contains(filter.getValue())) {
filterPassed = false;
break;
}
} else if (!v.equals(filter.getValue())) {
filterPassed = false;
break;
}
}
}
if (filterPassed) {
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
// Remove primary filter and other info if they are added for
// matching secondary filters
if (addPrimaryFilters) {
entity.setPrimaryFilters(null);
}
if (addOtherInfo) {
entity.setOtherInfo(null);
}
entities.addEntity(entity);
}
}
}
db = rollingdb.getPreviousDB(db);
}
}
return entities;
}
/**
* Put a single entity. If there is an error, add a TimelinePutError to the
* given response.
*
* @param entityUpdates
* a map containing all the scheduled writes for this put to the
* entity db
* @param indexUpdates
* a map containing all the scheduled writes for this put to the
* index db
*/
private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
TimelinePutResponse response) {
long putCount = 0;
List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
new ArrayList<EntityIdentifier>();
byte[] revStartTime = null;
Map<String, Set<Object>> primaryFilters = null;
try {
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
Long startTime = getAndSetStartTime(entity.getEntityId(),
entity.getEntityType(), entity.getStartTime(), events);
if (startTime == null) {
// if no start time is found, add an error and return
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
return putCount;
}
// Must have a domain
if (StringUtils.isEmpty(entity.getDomainId())) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.NO_DOMAIN);
response.addError(error);
return putCount;
}
revStartTime = writeReverseOrderedLong(startTime);
long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
if (rollingWriteBatch == null) {
DB db = entitydb.getDBForStartTime(startTime);
if (db != null) {
WriteBatch writeBatch = db.createWriteBatch();
rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
entityUpdates.put(roundedStartTime, rollingWriteBatch);
}
}
if (rollingWriteBatch == null) {
// if no start time is found, add an error and return
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
response.addError(error);
return putCount;
}
WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();
// Save off the getBytes conversion to avoid unnecessary cost
byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);
// write entity marker
byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
.add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
writeBatch.put(markerKey, EMPTY_BYTES);
++putCount;
// write domain id entry
byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
.add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
.getBytes();
writeBatch.put(domainkey, domainIdBytes);
++putCount;
// write event entries
if (events != null) {
for (TimelineEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
.add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
.add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
byte[] value = fstConf.asByteArray(event.getEventInfo());
writeBatch.put(key, value);
++putCount;
}
}
// write primary filter entries
primaryFilters = entity.getPrimaryFilters();
if (primaryFilters != null) {
for (Entry<String, Set<Object>> primaryFilter : primaryFilters
.entrySet()) {
for (Object primaryFilterValue : primaryFilter.getValue()) {
byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
.add(revStartTime).add(entityIdBytes, true)
.add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
.add(fstConf.asByteArray(primaryFilterValue)).getBytes();
writeBatch.put(key, EMPTY_BYTES);
++putCount;
}
}
}
// write other info entries
Map<String, Object> otherInfo = entity.getOtherInfo();
if (otherInfo != null) {
for (Entry<String, Object> info : otherInfo.entrySet()) {
byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
.add(revStartTime).add(entityIdBytes, true)
.add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
byte[] value = fstConf.asByteArray(info.getValue());
writeBatch.put(key, value);
++putCount;
}
}
// write related entity entries
Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
if (relatedEntities != null) {
for (Entry<String, Set<String>> relatedEntityList : relatedEntities
.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity
Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
relatedEntityType);
// delay writing the related entity if no start time is found
if (relatedStartTimeLong == null) {
relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
relatedEntityId, relatedEntityType));
continue;
}
byte[] relatedEntityStartTime =
writeReverseOrderedLong(relatedStartTimeLong);
long relatedRoundedStartTime = entitydb
.computeCurrentCheckMillis(relatedStartTimeLong);
RollingWriteBatch relatedRollingWriteBatch = entityUpdates
.get(relatedRoundedStartTime);
if (relatedRollingWriteBatch == null) {
DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
if (db != null) {
WriteBatch relatedWriteBatch = db.createWriteBatch();
relatedRollingWriteBatch = new RollingWriteBatch(db,
relatedWriteBatch);
entityUpdates.put(relatedRoundedStartTime,
relatedRollingWriteBatch);
}
}
if (relatedRollingWriteBatch == null) {
// if no start time is found, add an error and return
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
response.addError(error);
continue;
}
// This is the existing entity
byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
createDomainIdKey(relatedEntityId, relatedEntityType,
relatedEntityStartTime));
// The timeline data created by the server before 2.6 won't have
// the domain field. We assume this timeline data is in the
// default timeline domain.
String domainId = null;
if (relatedDomainIdBytes == null) {
domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
} else {
domainId = new String(relatedDomainIdBytes, UTF_8);
}
if (!domainId.equals(entity.getDomainId())) {
// in this case the entity will be put, but the relation will be
// ignored
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
response.addError(error);
continue;
}
// write "forward" entry (related entity -> entity)
byte[] key = createRelatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType());
WriteBatch relatedWriteBatch = relatedRollingWriteBatch
.getWriteBatch();
relatedWriteBatch.put(key, EMPTY_BYTES);
++putCount;
}
}
}
// write index entities
RollingWriteBatch indexRollingWriteBatch = indexUpdates
.get(roundedStartTime);
if (indexRollingWriteBatch == null) {
DB db = indexdb.getDBForStartTime(startTime);
if (db != null) {
WriteBatch indexWriteBatch = db.createWriteBatch();
indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
}
}
if (indexRollingWriteBatch == null) {
// if no start time is found, add an error and return
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
response.addError(error);
return putCount;
}
WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
markerKey, EMPTY_BYTES);
} catch (IOException e) {
LOG.error("Error putting entity " + entity.getEntityId() + " of type "
+ entity.getEntityType(), e);
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
}
for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
try {
Long relatedEntityStartAndInsertTime = getAndSetStartTime(
relatedEntity.getId(), relatedEntity.getType(),
readReverseOrderedLong(revStartTime, 0), null);
if (relatedEntityStartAndInsertTime == null) {
throw new IOException("Error setting start time for related entity");
}
long relatedStartTimeLong = relatedEntityStartAndInsertTime;
long relatedRoundedStartTime = entitydb
.computeCurrentCheckMillis(relatedStartTimeLong);
RollingWriteBatch relatedRollingWriteBatch = entityUpdates
.get(relatedRoundedStartTime);
if (relatedRollingWriteBatch == null) {
DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
if (db != null) {
WriteBatch relatedWriteBatch = db.createWriteBatch();
relatedRollingWriteBatch = new RollingWriteBatch(db,
relatedWriteBatch);
entityUpdates
.put(relatedRoundedStartTime, relatedRollingWriteBatch);
}
}
if (relatedRollingWriteBatch == null) {
// if no start time is found, add an error and return
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
response.addError(error);
continue;
}
WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
byte[] relatedEntityStartTime =
writeReverseOrderedLong(relatedEntityStartAndInsertTime);
// This is the new entity, the domain should be the same
byte[] key = createDomainIdKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime);
relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
++putCount;
relatedWriteBatch.put(
createRelatedEntityKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime,
entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
++putCount;
relatedWriteBatch.put(
createEntityMarkerKey(relatedEntity.getId(),
relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
++putCount;
} catch (IOException e) {
LOG.error(
"Error putting related entity " + relatedEntity.getId()
+ " of type " + relatedEntity.getType() + " for entity "
+ entity.getEntityId() + " of type " + entity.getEntityType(),
e);
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
}
}
return putCount;
}
/**
* For a given key / value pair that has been written to the db, write
* additional entries to the db for each primary filter.
*/
private static long writePrimaryFilterEntries(WriteBatch writeBatch,
Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
throws IOException {
long putCount = 0;
if (primaryFilters != null) {
for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
for (Object pfval : pf.getValue()) {
writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value);
++putCount;
}
}
}
return putCount;
}
@Override
public TimelinePutResponse put(TimelineEntities entities) {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting put");
}
TimelinePutResponse response = new TimelinePutResponse();
TreeMap<Long, RollingWriteBatch> entityUpdates =
new TreeMap<Long, RollingWriteBatch>();
TreeMap<Long, RollingWriteBatch> indexUpdates =
new TreeMap<Long, RollingWriteBatch>();
long entityCount = 0;
long indexCount = 0;
try {
for (TimelineEntity entity : entities.getEntities()) {
entityCount += putEntities(entityUpdates, indexUpdates, entity,
response);
}
for (RollingWriteBatch entityUpdate : entityUpdates.values()) {
entityUpdate.write();
}
for (RollingWriteBatch indexUpdate : indexUpdates.values()) {
indexUpdate.write();
}
} finally {
for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) {
entityRollingWriteBatch.close();
}
for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) {
indexRollingWriteBatch.close();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Put " + entityCount + " new leveldb entity entries and "
+ indexCount + " new leveldb index entries from "
+ entities.getEntities().size() + " timeline entities");
}
return response;
}
/**
* Get the unique start time for a given entity as a byte array that sorts the
* timestamps in reverse order (see
* {@link GenericObjectMapper#writeReverseOrderedLong(long)}).
*
* @param entityId
* The id of the entity
* @param entityType
* The type of the entity
* @return A byte array, null if not found
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType)
throws IOException {
Long l = getStartTimeLong(entityId, entityType);
return l == null ? null : writeReverseOrderedLong(l);
}
/**
* Get the unique start time for a given entity as a Long.
*
* @param entityId
* The id of the entity
* @param entityType
* The type of the entity
* @return A Long, null if not found
* @throws IOException
*/
private Long getStartTimeLong(String entityId, String entityType)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
// start time is not provided, so try to look it up
if (startTimeReadCache.containsKey(entity)) {
// found the start time in the cache
return startTimeReadCache.get(entity);
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = starttimedb.get(b);
if (v == null) {
// did not find the start time in the db
return null;
} else {
// found the start time in the db
Long l = readReverseOrderedLong(v, 0);
startTimeReadCache.put(entity, l);
return l;
}
}
}
/**
* Get the unique start time for a given entity as a byte array that sorts the
* timestamps in reverse order (see
* {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
* time doesn't exist, set it based on the information provided.
*
* @param entityId
* The id of the entity
* @param entityType
* The type of the entity
* @param startTime
* The start time of the entity, or null
* @param events
* A list of events for the entity, or null
* @return A StartAndInsertTime
* @throws IOException
*/
private Long getAndSetStartTime(String entityId,
String entityType, Long startTime, List<TimelineEvent> events)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
Long time = startTimeWriteCache.get(entity);
if (time != null) {
// return the value in the cache
return time;
}
if (startTime == null && events != null) {
// calculate best guess start time based on lowest event time
startTime = Long.MAX_VALUE;
for (TimelineEvent e : events) {
if (e.getTimestamp() < startTime) {
startTime = e.getTimestamp();
}
}
}
// check the provided start time matches the db
return checkStartTimeInDb(entity, startTime);
}
/**
* Checks db for start time and returns it if it exists. If it doesn't exist,
* writes the suggested start time (if it is not null). This is only called
* when the start time is not found in the cache, so it adds it back into the
* cache if it is found.
*/
private Long checkStartTimeInDb(EntityIdentifier entity,
Long suggestedStartTime) throws IOException {
Long startAndInsertTime = null;
// create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
synchronized (this) {
// retrieve value for key
byte[] v = starttimedb.get(b);
if (v == null) {
// start time doesn't exist in db
if (suggestedStartTime == null) {
return null;
}
startAndInsertTime = suggestedStartTime;
// write suggested start time
starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
} else {
// found start time in db, so ignore suggested start time
startAndInsertTime = readReverseOrderedLong(v, 0);
}
}
startTimeWriteCache.put(entity, startAndInsertTime);
startTimeReadCache.put(entity, startAndInsertTime);
return startAndInsertTime;
}
/**
* Creates a key for looking up the start time of a given entity, of the form
* START_TIME_LOOKUP_PREFIX + entity type + entity id.
*/
private static byte[] createStartTimeLookupKey(String entityId,
String entityType) throws IOException {
return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes();
}
/**
* Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
* revstarttime + entity id.
*/
private static byte[] createEntityMarkerKey(String entityId,
String entityType, byte[] revStartTime) throws IOException {
return KeyBuilder.newInstance().add(entityType).add(revStartTime)
.add(entityId).getBytesForLookup();
}
/**
* Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX +
* primaryfiltername + primaryfiltervalue + key.
*/
private static byte[] addPrimaryFilterToKey(String primaryFilterName,
Object primaryFilterValue, byte[] key) throws IOException {
return KeyBuilder.newInstance().add(primaryFilterName)
.add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes();
}
/**
* Creates an event object from the given key, offset, and value. If the event
* type is not contained in the specified set of event types, returns null.
*/
private static TimelineEvent getEntityEvent(Set<String> eventTypes,
byte[] key, int offset, byte[] value) throws IOException {
KeyParser kp = new KeyParser(key, offset);
long ts = kp.getNextLong();
String tstype = kp.getNextString();
if (eventTypes == null || eventTypes.contains(tstype)) {
TimelineEvent event = new TimelineEvent();
event.setTimestamp(ts);
event.setEventType(tstype);
Object o = null;
try {
o = fstConf.asObject(value);
} catch (Exception ignore) {
try {
// Fall back to 2.24 parser
o = fstConf224.asObject(value);
} catch (Exception e) {
LOG.warn("Error while decoding " + tstype, e);
}
}
if (o == null) {
event.setEventInfo(null);
} else if (o instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) o;
event.setEventInfo(m);
} else {
throw new IOException("Couldn't deserialize event info map");
}
return event;
}
return null;
}
/**
* Parses the primary filter from the given key at the given offset and adds
* it to the given entity.
*/
private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String name = kp.getNextString();
byte[] bytes = kp.getRemainingBytes();
Object value = null;
try {
value = fstConf.asObject(bytes);
entity.addPrimaryFilter(name, value);
} catch (Exception ignore) {
try {
// Fall back to 2.24 parser
value = fstConf224.asObject(bytes);
entity.addPrimaryFilter(name, value);
} catch (Exception e) {
LOG.warn("Error while decoding " + name, e);
}
}
}
/**
* Creates a string representation of the byte array from the given offset to
* the end of the array (for parsing other info keys).
*/
private static String parseRemainingKey(byte[] b, int offset) {
return new String(b, offset, b.length - offset, UTF_8);
}
/**
* Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type
* + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type +
* relatedentity id.
*/
private static byte[] createRelatedEntityKey(String entityId,
String entityType, byte[] revStartTime, String relatedEntityId,
String relatedEntityType) throws IOException {
return KeyBuilder.newInstance().add(entityType).add(revStartTime)
.add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType)
.add(relatedEntityId).getBytes();
}
/**
* Parses the related entity from the given key at the given offset and adds
* it to the given entity.
*/
private static void addRelatedEntity(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String type = kp.getNextString();
String id = kp.getNextString();
entity.addRelatedEntity(type, id);
}
/**
* Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type +
* revstarttime + entity id + DOMAIN_ID_COLUMN.
*/
private static byte[] createDomainIdKey(String entityId, String entityType,
byte[] revStartTime) throws IOException {
return KeyBuilder.newInstance().add(entityType).add(revStartTime)
.add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
}
/**
* Clears the cache to test reloading start times from leveldb (only for
* testing).
*/
@VisibleForTesting
void clearStartTimeCache() {
startTimeWriteCache.clear();
startTimeReadCache.clear();
}
@VisibleForTesting
static int getStartTimeReadCacheSize(Configuration conf) {
return conf
.getInt(
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
}
@VisibleForTesting
static int getStartTimeWriteCacheSize(Configuration conf) {
return conf
.getInt(
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
}
@VisibleForTesting
long evictOldStartTimes(long minStartTime) throws IOException {
LOG.info("Searching for start times to evict earlier than " + minStartTime);
long batchSize = 0;
long totalCount = 0;
long startTimesCount = 0;
WriteBatch writeBatch = null;
ReadOptions readOptions = new ReadOptions();
readOptions.fillCache(false);
try (DBIterator iterator = starttimedb.iterator(readOptions)) {
// seek to the first start time entry
iterator.seekToFirst();
writeBatch = starttimedb.createWriteBatch();
// evaluate each start time entry to see if it needs to be evicted or not
while (iterator.hasNext()) {
Map.Entry<byte[], byte[]> current = iterator.next();
byte[] entityKey = current.getKey();
byte[] entityValue = current.getValue();
long startTime = readReverseOrderedLong(entityValue, 0);
if (startTime < minStartTime) {
++batchSize;
++startTimesCount;
writeBatch.delete(entityKey);
// a large delete will hold the lock for too long
if (batchSize >= writeBatchSize) {
if (LOG.isDebugEnabled()) {
LOG.debug("Preparing to delete a batch of " + batchSize
+ " old start times");
}
starttimedb.write(writeBatch);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted batch of " + batchSize
+ ". Total start times deleted so far this cycle: "
+ startTimesCount);
}
IOUtils.cleanupWithLogger(LOG, writeBatch);
writeBatch = starttimedb.createWriteBatch();
batchSize = 0;
}
}
++totalCount;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Preparing to delete a batch of " + batchSize
+ " old start times");
}
starttimedb.write(writeBatch);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted batch of " + batchSize
+ ". Total start times deleted so far this cycle: "
+ startTimesCount);
}
LOG.info("Deleted " + startTimesCount + "/" + totalCount
+ " start time entities earlier than " + minStartTime);
} finally {
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
return startTimesCount;
}
/**
* Discards entities with start timestamp less than or equal to the given
* timestamp.
*/
@VisibleForTesting
void discardOldEntities(long timestamp) throws IOException,
InterruptedException {
long totalCount = 0;
long t1 = System.currentTimeMillis();
try {
totalCount += evictOldStartTimes(timestamp);
indexdb.evictOldDBs();
entitydb.evictOldDBs();
} finally {
long t2 = System.currentTimeMillis();
LOG.info("Discarded " + totalCount + " entities for timestamp "
+ timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
}
}
Version loadVersion() throws IOException {
byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return Version.newInstance(1, 0);
}
Version version = new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
// Only used for test
@VisibleForTesting
void storeVersion(Version state) throws IOException {
dbStoreVersion(state);
}
private void dbStoreVersion(Version state) throws IOException {
String key = TIMELINE_STORE_VERSION_KEY;
byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
try {
starttimedb.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e);
}
}
Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25,
* 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any
* compatible change of TS-store is a minor upgrade. 3) Within a minor
* upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate
* user to use a separate upgrade tool to upgrade timeline store or remove
* incompatible old state.
*/
private void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded timeline store version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing timeline store version info " + getCurrentVersion());
dbStoreVersion(CURRENT_VERSION_INFO);
} else {
String incompatibleMessage = "Incompatible version for timeline store: "
+ "expecting version " + getCurrentVersion()
+ ", but loading version " + loadedVersion;
LOG.error(incompatibleMessage);
throw new IOException(incompatibleMessage);
}
}
// TODO: make data retention work with the domain data as well
@Override
public void put(TimelineDomain domain) throws IOException {
try (WriteBatch domainWriteBatch = domaindb.createWriteBatch();
WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) {
if (domain.getId() == null || domain.getId().length() == 0) {
throw new IllegalArgumentException("Domain doesn't have an ID");
}
if (domain.getOwner() == null || domain.getOwner().length() == 0) {
throw new IllegalArgumentException("Domain doesn't have an owner.");
}
// Write description
byte[] domainEntryKey = createDomainEntryKey(domain.getId(),
DESCRIPTION_COLUMN);
byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
domain.getId(), DESCRIPTION_COLUMN);
if (domain.getDescription() != null) {
domainWriteBatch.put(domainEntryKey,
domain.getDescription().getBytes(UTF_8));
ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription()
.getBytes(UTF_8));
} else {
domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write owner
domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
domain.getId(), OWNER_COLUMN);
// Null check for owner is done before
domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8));
ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner()
.getBytes(UTF_8));
// Write readers
domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
domain.getId(), READER_COLUMN);
if (domain.getReaders() != null && domain.getReaders().length() > 0) {
domainWriteBatch.put(domainEntryKey, domain.getReaders()
.getBytes(UTF_8));
ownerWriteBatch.put(ownerLookupEntryKey,
domain.getReaders().getBytes(UTF_8));
} else {
domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write writers
domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
domain.getId(), WRITER_COLUMN);
if (domain.getWriters() != null && domain.getWriters().length() > 0) {
domainWriteBatch.put(domainEntryKey, domain.getWriters()
.getBytes(UTF_8));
ownerWriteBatch.put(ownerLookupEntryKey,
domain.getWriters().getBytes(UTF_8));
} else {
domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
}
// Write creation time and modification time
// We put both timestamps together because they are always retrieved
// together, and store them in the same way as we did for the entity's
// start time and insert time.
domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
domain.getId(), TIMESTAMP_COLUMN);
long currentTimestamp = System.currentTimeMillis();
byte[] timestamps = domaindb.get(domainEntryKey);
if (timestamps == null) {
timestamps = new byte[16];
writeReverseOrderedLong(currentTimestamp, timestamps, 0);
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
} else {
writeReverseOrderedLong(currentTimestamp, timestamps, 8);
}
domainWriteBatch.put(domainEntryKey, timestamps);
ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
domaindb.write(domainWriteBatch);
ownerdb.write(ownerWriteBatch);
}
}
/**
* Creates a domain entity key with column name suffix, of the form
* DOMAIN_ENTRY_PREFIX + domain id + column name.
*/
private static byte[] createDomainEntryKey(String domainId, byte[] columnName)
throws IOException {
return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes();
}
/**
* Creates an owner lookup key with column name suffix, of the form
* OWNER_LOOKUP_PREFIX + owner + domain id + column name.
*/
private static byte[] createOwnerLookupKey(String owner, String domainId,
byte[] columnName) throws IOException {
return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName)
.getBytes();
}
@Override
public TimelineDomain getDomain(String domainId) throws IOException {
try (DBIterator iterator = domaindb.iterator()) {
byte[] prefix = KeyBuilder.newInstance().add(domainId)
.getBytesForLookup();
iterator.seek(prefix);
return getTimelineDomain(iterator, domainId, prefix);
}
}
@Override
public TimelineDomains getDomains(String owner) throws IOException {
try (DBIterator iterator = ownerdb.iterator()) {
byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
iterator.seek(prefix);
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
while (iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
break;
}
// Iterator to parse the rows of an individual domain
KeyParser kp = new KeyParser(key, prefix.length);
String domainId = kp.getNextString();
byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId)
.getBytesForLookup();
TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId,
prefixExt);
if (domainToReturn != null) {
domains.add(domainToReturn);
}
}
// Sort the domains to return
Collections.sort(domains, new Comparator<TimelineDomain>() {
@Override
public int compare(TimelineDomain domain1, TimelineDomain domain2) {
int result = domain2.getCreatedTime().compareTo(
domain1.getCreatedTime());
if (result == 0) {
return domain2.getModifiedTime().compareTo(
domain1.getModifiedTime());
} else {
return result;
}
}
});
TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains);
return domainsToReturn;
}
}
private static TimelineDomain getTimelineDomain(DBIterator iterator,
String domainId, byte[] prefix) throws IOException {
// Iterate over all the rows whose key starts with prefix to retrieve the
// domain information.
TimelineDomain domain = new TimelineDomain();
domain.setId(domainId);
boolean noRows = true;
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) {
break;
}
if (noRows) {
noRows = false;
}
byte[] value = iterator.peekNext().getValue();
if (value != null && value.length > 0) {
if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
domain.setDescription(new String(value, UTF_8));
} else if (key[prefix.length] == OWNER_COLUMN[0]) {
domain.setOwner(new String(value, UTF_8));
} else if (key[prefix.length] == READER_COLUMN[0]) {
domain.setReaders(new String(value, UTF_8));
} else if (key[prefix.length] == WRITER_COLUMN[0]) {
domain.setWriters(new String(value, UTF_8));
} else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
domain.setCreatedTime(readReverseOrderedLong(value, 0));
domain.setModifiedTime(readReverseOrderedLong(value, 8));
} else {
LOG.error("Unrecognized domain column: " + key[prefix.length]);
}
}
}
if (noRows) {
return null;
} else {
return domain;
}
}
}