blob: 5ee50a750b499a09c040b627bdf50d057236fb46 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.metricstore.rocksdb;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.metricstore.MetricException;
import org.apache.storm.utils.LruMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to create a use a cache that stores Metadata string information in memory. It allows searching for a
* Metadata string's unique id, or looking up the string by the unique id. The StringMetadata is stored in an
* LRU map. When an entry is added to the cache, an older entry may be evicted, which then needs to be
* immediately stored to the database to provide a consistent view of all the metadata strings.
*
* <p>All write operations adding metadata to RocksDB are done by a single thread (a RocksDbMetricsWriter),
* but multiple threads can read values from the cache. To clarify which permissions are accessible by various
* threads, the ReadOnlyStringMetadataCache and WritableStringMetadataCache are provided to be used.
*/
public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, StringMetadata>,
WritableStringMetadataCache, ReadOnlyStringMetadataCache {
private static final Logger LOG = LoggerFactory.getLogger(StringMetadataCache.class);
private static StringMetadataCache instance = null;
private Map<String, StringMetadata> lruStringCache;
private Map<Integer, String> hashToString = new ConcurrentHashMap<>();
private RocksDbMetricsWriter dbWriter;
/**
* Constructor to create a cache.
*
* @param dbWriter The rocks db writer instance the cache should use when evicting data
* @param capacity The cache size
*/
private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) {
lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this));
this.dbWriter = dbWriter;
}
/**
* Initializes the cache instance. Should be called only once in the JVM, subsequent calls
* will be ignored unless preceded by {@link #init(RocksDbMetricsWriter, int)}.
*
* @param dbWriter the RocksDB writer instance to handle writing evicted cache data
* @param capacity the number of StringMetadata instances to hold in memory
*/
static void init(RocksDbMetricsWriter dbWriter, int capacity) {
if (instance == null) {
instance = new StringMetadataCache(dbWriter, capacity);
} else {
LOG.error("Ignoring call to init() since StringMetadataCache already created");
}
}
/**
* Provides the WritableStringMetadataCache interface to the cache instance.
*
* @throws MetricException if the cache instance was not created
*/
static WritableStringMetadataCache getWritableStringMetadataCache() throws MetricException {
if (instance != null) {
return instance;
} else {
throw new MetricException("StringMetadataCache was not initialized");
}
}
/**
* Provides the ReadOnlyStringMetadataCache interface to the cache instance.
*
* @throws MetricException if the cache instance was not created
*/
static ReadOnlyStringMetadataCache getReadOnlyStringMetadataCache() throws MetricException {
if (instance != null) {
return instance;
} else {
throw new MetricException("StringMetadataCache was not initialized");
}
}
static void cleanUp() {
instance = null;
}
/**
* Get the string metadata from the cache.
*
* @param s The string to look for
* @return the metadata associated with the string or null if not found
*/
@Override
public StringMetadata get(String s) {
return lruStringCache.get(s);
}
/**
* Add the string metadata to the cache.
*
* <p>NOTE: this can cause data to be evicted from the cache when full. When this occurs, the evictionCallback() method
* is called to store the metadata back into the RocksDB database.
*
* <p>This method is only exposed to the WritableStringMetadataCache interface.
*
* @param s The string to add
* @param stringMetadata The string's metadata
* @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately
* @throws MetricException when evicted data fails to save to the database or when the database is shutdown
*/
@Override
public void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException {
if (dbWriter.isShutdown()) {
// another thread could be writing out the metadata cache to the database.
throw new MetricException("Shutting down");
}
try {
if (newEntry) {
writeMetadataToDisk(s, stringMetadata);
}
lruStringCache.put(s, stringMetadata);
hashToString.put(stringMetadata.getStringId(), s);
} catch (Exception e) { // catch any runtime exceptions caused by eviction
throw new MetricException("Failed to save string in metadata cache", e);
}
}
/**
* Callback when data is about to be removed from the cache. This method then
* immediately writes the metadata to RocksDB.
*
* @param key The evicted string
* @param val The evicted string's metadata
* @throws RuntimeException when evicted data fails to save to the database
*/
@Override
public void evictionCallback(String key, StringMetadata val) {
writeMetadataToDisk(key, val);
}
private void writeMetadataToDisk(String key, StringMetadata val) {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing {} to RocksDB", key);
}
// remove reverse lookup from map
hashToString.remove(val.getStringId());
// save the evicted key/value to the database immediately
RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), key);
for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches
RocksDbKey rkey = new RocksDbKey(type, val.getStringId());
dbWriter.handleEvictedMetadata(rkey, rval);
}
}
/**
* Determines if a string Id is contained in the cache.
*
* @param stringId The string Id to check
* @return true if the Id is in the cache, false otherwise
*/
@Override
public boolean contains(Integer stringId) {
return hashToString.containsKey(stringId);
}
/**
* Returns the string matching the string Id if in the cache.
*
* @param stringId The string Id to check
* @return the associated string if the Id is in the cache, null otherwise
*/
@Override
public String getMetadataString(Integer stringId) {
return hashToString.get(stringId);
}
/**
* Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown.
*
* @return the string metadata map entrySet
*/
@Override
public Set<Map.Entry<String, StringMetadata>> entrySet() {
return lruStringCache.entrySet();
}
}