blob: 89e4cec6eca2ef46ca3d9ae0116047f841f360cf [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.metricstore.rocksdb;
import com.codahale.metrics.Meter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metricstore.AggLevel;
import org.apache.storm.metricstore.FilterOptions;
import org.apache.storm.metricstore.Metric;
import org.apache.storm.metricstore.MetricException;
import org.apache.storm.metricstore.MetricStore;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.IndexType;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RocksDbStore implements MetricStore, AutoCloseable {
static final int INVALID_METADATA_STRING_ID = 0;
private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
private static final int MAX_QUEUE_CAPACITY = 4000;
RocksDB db;
private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
private RocksDbMetricsWriter metricsWriter = null;
private MetricsCleaner metricsCleaner = null;
private Meter failureMeter = null;
* Create metric store instance using the configurations provided via the config map.
* @param config Storm config map
* @param metricsRegistry The Nimbus daemon metrics registry
* @throws MetricException on preparation error
public void prepare(Map<String, Object> config, StormMetricsRegistry metricsRegistry) throws MetricException {
this.failureMeter = metricsRegistry.registerMeter("RocksDB:metric-failures");
boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
// use the hash index for prefix searches
BlockBasedTableConfig tfc = new BlockBasedTableConfig();
String path = getRocksDbAbsoluteDir(config);"Opening RocksDB from {}", path);
db =, path);
} catch (RocksDBException e) {
String message = "Error opening RockDB database";
LOG.error(message, e);
throw new MetricException(message, e);
// create thread to delete old metrics and metadata
Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
Integer deletionPeriod = 0;
if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter, metricsRegistry);
// create thread to process insertion of all metrics
metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
StringMetadataCache.init(metricsWriter, cacheCapacity);
readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
metricsWriter.init(); // init the writer once the cache is setup
// start threads after metadata cache created
Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
* Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
* @param config Storm config to specify which store type, location of store and creation policy
* @throws MetricException if there is a missing required configuration or if the store does not exist but
* the config specifies not to create the store
private void validateConfig(Map<String, Object> config) throws MetricException {
if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
// validate path defined
String storePath = getRocksDbAbsoluteDir(config);
boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
if (!createIfMissing) {
if (!(new File(storePath).exists())) {
throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
private String getRocksDbAbsoluteDir(Map<String, Object> conf) throws MetricException {
String storePath = (String) conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
if (storePath == null) {
throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
} else {
if (new File(storePath).isAbsolute()) {
return storePath;
} else {
String stormHome = System.getProperty(ConfigUtils.STORM_HOME);
if (stormHome == null) {
throw new MetricException(ConfigUtils.STORM_HOME + " not set");
return (stormHome + File.separator + storePath);
* Stores metrics in the store.
* @param metric Metric to store
* @throws MetricException if database write fails
public void insert(Metric metric) throws MetricException {
try {
// don't bother blocking on a full queue, just drop metrics in case we can't keep up
if (queue.remainingCapacity() <= 0) {"Metrics q full, dropping metric");
} catch (Exception e) {
String message = "Failed to insert metric";
LOG.error(message, e);
if (this.failureMeter != null) {
throw new MetricException(message, e);
* Fill out the numeric values for a metric.
* @param metric Metric to populate
* @return true if the metric was populated, false otherwise
* @throws MetricException if read from database fails
public boolean populateValue(Metric metric) throws MetricException {
Map<String, Integer> localLookupCache = new HashMap<>(6);
int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), localLookupCache);
return false;
int metricId = lookupMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), localLookupCache);
return false;
int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), localLookupCache);
if (INVALID_METADATA_STRING_ID == componentId) {
return false;
int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), localLookupCache);
return false;
int hostId = lookupMetadataString(KeyType.HOST_STRING, metric.getHostname(), localLookupCache);
return false;
int streamId = lookupMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), localLookupCache);
return false;
RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), topologyId, metric.getTimestamp(), metricId,
componentId, executorId, hostId, metric.getPort(), streamId);
return populateFromKey(key, metric);
// populate metric values using the provided key
boolean populateFromKey(RocksDbKey key, Metric metric) throws MetricException {
try {
byte[] value = db.get(key.getRaw());
if (value == null) {
return false;
RocksDbValue rdbValue = new RocksDbValue(value);
} catch (Exception e) {
String message = "Failed to populate metric";
LOG.error(message, e);
if (this.failureMeter != null) {
throw new MetricException(message, e);
return true;
// attempts to lookup the unique Id for a string that may not exist yet. Returns INVALID_METADATA_STRING_ID
// if it does not exist.
private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lookupCache) throws MetricException {
if (s == null) {
if (this.failureMeter != null) {
throw new MetricException("No string for metric metadata string type " + type);
// attempt to find it in the string cache, this will update the LRU
StringMetadata stringMetadata = readOnlyStringMetadataCache.get(s);
if (stringMetadata != null) {
return stringMetadata.getStringId();
// attempt to find it in callers cache
Integer id = lookupCache.get(s);
if (id != null) {
return id;
// attempt to find the string in the database
try {
stringMetadata = rocksDbGetStringMetadata(type, s);
} catch (RocksDBException e) {
throw new MetricException("Error reading metric data", e);
if (stringMetadata != null) {
id = stringMetadata.getStringId();
// add to the callers cache. We can't add it to the stringMetadataCache, since that could cause an eviction
// database write, which we want to only occur from the inserting DB thread.
lookupCache.put(s, id);
return id;
// string does not exist
// scans the database to look for a metadata string and returns the metadata info
StringMetadata rocksDbGetStringMetadata(KeyType type, String s) throws RocksDBException {
RocksDbKey firstKey = RocksDbKey.getInitialKey(type);
RocksDbKey lastKey = RocksDbKey.getLastKey(type);
final AtomicReference<StringMetadata> reference = new AtomicReference<>();
scanRange(firstKey, lastKey, (key, value) -> {
if (s.equals(value.getMetdataString())) {
return false;
} else {
return true; // haven't found string, keep searching
return reference.get();
// scans from key start to the key before end, calling back until callback indicates not to process further
void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) throws RocksDBException {
try (ReadOptions ro = new ReadOptions()) {
try (RocksIterator iterator = db.newIterator(ro)) {
for (; iterator.isValid(); {
RocksDbKey key = new RocksDbKey(iterator.key());
if (key.compareTo(end) >= 0) { // past limit, quit
RocksDbValue val = new RocksDbValue(iterator.value());
if (!fn.cb(key, val)) {
// if cb returns false, we are done with this section of rows
* Shutdown the store.
public void close() {
* Scans all metrics in the store and returns the ones matching the specified filtering options.
* Callback returns Metric class results.
* @param filter options to filter by
* @param scanCallback callback for each Metric found
* @throws MetricException on error
public void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException {
scanInternal(filter, scanCallback, null);
* Scans all metrics in the store and returns the ones matching the specified filtering options.
* Callback returns raw key/value data.
* @param filter options to filter by
* @param rawCallback callback for each Metric found
* @throws MetricException on error
private void scanRaw(FilterOptions filter, RocksDbScanCallback rawCallback) throws MetricException {
scanInternal(filter, null, rawCallback);
// perform a scan given filter options, and return results in either Metric or raw data.
private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException {
Map<String, Integer> stringToIdCache = new HashMap<>();
Map<Integer, String> idToStringCache = new HashMap<>();
int startTopologyId = 0;
int endTopologyId = 0xFFFFFFFF;
String filterTopologyId = filter.getTopologyId();
if (filterTopologyId != null) {
int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache);
return; // string does not exist in database
startTopologyId = topologyId;
endTopologyId = topologyId;
long startTime = filter.getStartTime();
long endTime = filter.getEndTime();
int startMetricId = 0;
int endMetricId = 0xFFFFFFFF;
String filterMetricName = filter.getMetricName();
if (filterMetricName != null) {
int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache);
return; // string does not exist in database
startMetricId = metricId;
endMetricId = metricId;
int startComponentId = 0;
int endComponentId = 0xFFFFFFFF;
String filterComponentId = filter.getComponentId();
if (filterComponentId != null) {
int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache);
if (INVALID_METADATA_STRING_ID == componentId) {
return; // string does not exist in database
startComponentId = componentId;
endComponentId = componentId;
int startExecutorId = 0;
int endExecutorId = 0xFFFFFFFF;
String filterExecutorName = filter.getExecutorId();
if (filterExecutorName != null) {
int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache);
return; // string does not exist in database
startExecutorId = executorId;
endExecutorId = executorId;
int startHostId = 0;
int endHostId = 0xFFFFFFFF;
String filterHostId = filter.getHostId();
if (filterHostId != null) {
int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache);
return; // string does not exist in database
startHostId = hostId;
endHostId = hostId;
int startPort = 0;
int endPort = 0xFFFFFFFF;
Integer filterPort = filter.getPort();
if (filterPort != null) {
startPort = filterPort;
endPort = filterPort;
int startStreamId = 0;
int endStreamId = 0xFFFFFFFF;
String filterStreamId = filter.getStreamId();
if (filterStreamId != null) {
int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache);
return; // string does not exist in database
startStreamId = streamId;
endStreamId = streamId;
try (ReadOptions ro = new ReadOptions()) {
for (AggLevel aggLevel : filter.getAggLevels()) {
RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
startComponentId, startExecutorId, startHostId, startPort, startStreamId);
RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
endComponentId, endExecutorId, endHostId, endPort, endStreamId);
try (RocksIterator iterator = db.newIterator(ro)) {
for (; iterator.isValid(); {
RocksDbKey key = new RocksDbKey(iterator.key());
if (key.compareTo(endKey) > 0) { // past limit, quit
if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
long timestamp = key.getTimestamp();
if (timestamp < startTime || timestamp > endTime) {
if (startMetricId != 0 && key.getMetricId() != startMetricId) {
if (startComponentId != 0 && key.getComponentId() != startComponentId) {
if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
if (startHostId != 0 && key.getHostnameId() != startHostId) {
if (startPort != 0 && key.getPort() != startPort) {
if (startStreamId != 0 && key.getStreamId() != startStreamId) {
RocksDbValue val = new RocksDbValue(iterator.value());
if (scanCallback != null) {
try {
// populate a metric
String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);
Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
streamId, key.getPort(), aggLevel);
// callback to caller
} catch (MetricException e) {
LOG.warn("Failed to report found metric: {}", e.getMessage());
} else {
try {
if (!rawCallback.cb(key, val)) {
} catch (RocksDBException e) {
throw new MetricException("Error reading metrics data", e);
// Finds the metadata string that matches the string Id and type provided. The string should exist, as it is
// referenced from a metric.
private String metadataIdToString(KeyType type, int id, Map<Integer, String> lookupCache) throws MetricException {
String s = readOnlyStringMetadataCache.getMetadataString(id);
if (s != null) {
return s;
s = lookupCache.get(id);
if (s != null) {
return s;
// get from DB and add to lookup cache
RocksDbKey key = new RocksDbKey(type, id);
try {
byte[] value = db.get(key.getRaw());
if (value == null) {
throw new MetricException("Failed to find metadata string for id " + id + " of type " + type);
RocksDbValue rdbValue = new RocksDbValue(value);
s = rdbValue.getMetdataString();
lookupCache.put(id, s);
return s;
} catch (RocksDBException e) {
if (this.failureMeter != null) {
throw new MetricException("Failed to get from RocksDb", e);
// deletes metrics matching the filter options
void deleteMetrics(FilterOptions filter) throws MetricException {
try (WriteBatch writeBatch = new WriteBatch();
WriteOptions writeOps = new WriteOptions()) {
scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
return true;
if (writeBatch.count() > 0) {"Deleting {} metrics", writeBatch.count());
try {
db.write(writeOps, writeBatch);
} catch (Exception e) {
String message = "Failed delete metrics";
LOG.error(message, e);
if (this.failureMeter != null) {
throw new MetricException(message, e);
// deletes metadata strings before the provided timestamp
void deleteMetadataBefore(long firstValidTimestamp) throws MetricException {
if (firstValidTimestamp < 1L) {
if (this.failureMeter != null) {
throw new MetricException("Invalid timestamp for deleting metadata: " + firstValidTimestamp);
try (WriteBatch writeBatch = new WriteBatch();
WriteOptions writeOps = new WriteOptions()) {
// search all metadata strings
RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
try {
scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
// we'll assume the metadata was recently used if still in the cache.
if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
if (value.getLastTimestamp() < firstValidTimestamp) {
return true;
} catch (RocksDBException e) {
throw new MetricException("Error reading metric data", e);
if (writeBatch.count() > 0) {"Deleting {} metadata strings", writeBatch.count());
try {
db.write(writeOps, writeBatch);
} catch (Exception e) {
String message = "Failed delete metadata strings";
LOG.error(message, e);
if (this.failureMeter != null) {
throw new MetricException(message, e);
interface RocksDbScanCallback {
boolean cb(RocksDbKey key, RocksDbValue val) throws RocksDBException; // return false to stop scan