| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage.kv |
| |
| import java.io.File |
| import java.nio.file.{Path, Paths} |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.locks.ReentrantReadWriteLock |
| import java.util.{Comparator, Optional} |
| |
| import org.apache.samza.SamzaException |
| import org.apache.samza.checkpoint.CheckpointId |
| import org.apache.samza.config.Config |
| import org.apache.samza.util.Logging |
| import org.rocksdb.{TtlDB, _} |
| |
| object RocksDbKeyValueStore extends Logging { |
| |
| def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, |
| storeName: String, metrics: KeyValueStoreMetrics): RocksDB = { |
| var ttl = 0L |
| var useTTL = false |
| |
| if (storeConfig.containsKey("rocksdb.ttl.ms")) { |
| try { |
| ttl = storeConfig.getLong("rocksdb.ttl.ms") |
| |
| if (ttl > 0) { |
| if (ttl < 1000) { |
| warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " + |
| "Using 1000 ms instead.", storeName, ttl) |
| ttl = 1000 |
| } |
| ttl = TimeUnit.MILLISECONDS.toSeconds(ttl) |
| } else { |
| warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " + |
| "More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live") |
| } |
| |
| useTTL = true |
| if (isLoggedStore) { |
| warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " + |
| "Use at your own discretion." format storeName) |
| } |
| } catch { |
| case nfe: NumberFormatException => |
| throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number." |
| format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe) |
| } |
| } |
| |
| try { |
| val rocksDb = |
| if (useTTL) { |
| info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl)) |
| TtlDB.open(options, dir.toString, ttl.toInt, false) |
| } else { |
| RocksDB.open(options, dir.toString) |
| } |
| |
| // See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties |
| val rocksDbMetrics = Set ( |
| "rocksdb.estimate-table-readers-mem", // indexes and bloom filters |
| "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes |
| "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes |
| "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes |
| "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage |
| ) |
| |
| val configuredMetrics = storeConfig |
| .get("rocksdb.metrics.list", "") |
| .split(",") |
| .map(property => property.trim) |
| .filter(!_.isEmpty) |
| .toSet |
| |
| (configuredMetrics ++ rocksDbMetrics) |
| .foreach(property => metrics.newGauge(property, () => |
| // Check isOwningHandle flag. The db is open iff the flag is true. |
| if (rocksDb.isOwningHandle) { |
| rocksDb.getProperty(property) |
| } else { |
| "0" |
| } |
| )) |
| |
| rocksDb |
| } catch { |
| case rocksDBException: RocksDBException => |
| throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString), |
| rocksDBException) |
| } |
| } |
| } |
| |
| class RocksDbKeyValueStore( |
| val dir: File, |
| val options: Options, |
| val storeConfig: Config, |
| val isLoggedStore: Boolean, |
| val storeName: String, |
| val writeOptions: WriteOptions = new WriteOptions(), |
| val flushOptions: FlushOptions = new FlushOptions(), |
| val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { |
| |
| // lazy val here is important because the store directories do not exist yet, it can only be opened |
| // after the directories are created, which happens much later from now. |
| private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics) |
| private val lexicographic = new LexicographicComparator() |
| |
| /** |
| * null while the store is open. Set to an Exception holding the stacktrace at the time of first close by #close. |
| * Reads and writes to this field must be guarded by stateChangeLock. |
| * This is an Exception instead of an Array[StackTraceElement] for ease of logging. |
| */ |
| private var stackAtFirstClose: Exception = null |
| private val stateChangeLock = new ReentrantReadWriteLock() |
| |
| def get(key: Array[Byte]): Array[Byte] = ifOpen { |
| metrics.gets.inc |
| require(key != null, "Null key not allowed.") |
| val found = db.get(key) |
| if (found != null) { |
| metrics.bytesRead.inc(found.length) |
| } |
| found |
| } |
| |
| override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = ifOpen { |
| metrics.getAlls.inc |
| require(keys != null, "Null keys not allowed.") |
| val map = db.multiGet(keys) |
| if (map != null) { |
| var bytesRead = 0L |
| val iterator = map.values().iterator |
| while (iterator.hasNext) { |
| val value = iterator.next |
| if (value != null) { |
| bytesRead += value.length |
| } |
| } |
| metrics.bytesRead.inc(bytesRead) |
| } |
| map |
| } |
| |
| def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen { |
| require(key != null, "Null key not allowed.") |
| if (value == null) { |
| metrics.deletes.inc |
| db.delete(writeOptions, key) |
| } else { |
| metrics.puts.inc |
| metrics.bytesWritten.inc(key.length + value.length) |
| db.put(writeOptions, key, value) |
| } |
| } |
| |
| def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen { |
| metrics.putAlls.inc() |
| val iter = entries.iterator |
| var wrote = 0 |
| var deletes = 0 |
| val writeBatch = new WriteBatch() |
| while (iter.hasNext) { |
| val curr = iter.next() |
| if (curr.getValue == null) { |
| deletes += 1 |
| writeBatch.remove(curr.getKey) |
| } else { |
| wrote += 1 |
| val key = curr.getKey |
| val value = curr.getValue |
| metrics.bytesWritten.inc(key.length + value.length) |
| writeBatch.put(key, value) |
| } |
| } |
| db.write(writeOptions, writeBatch) |
| writeBatch.close() |
| metrics.puts.inc(wrote) |
| metrics.deletes.inc(deletes) |
| } |
| |
| def delete(key: Array[Byte]): Unit = ifOpen { |
| put(key, null) |
| } |
| |
| def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = ifOpen { |
| metrics.ranges.inc |
| require(from != null && to != null, "Null bound not allowed.") |
| new RocksDbRangeIterator(db.newIterator(), from, to) |
| } |
| |
| def all(): KeyValueIterator[Array[Byte], Array[Byte]] = ifOpen { |
| metrics.alls.inc |
| val iter = db.newIterator() |
| iter.seekToFirst() |
| new RocksDbIterator(iter) |
| } |
| |
| override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = { |
| val readOptions = new ReadOptions() |
| readOptions.setSnapshot(db.getSnapshot) |
| |
| new KeyValueSnapshot[Array[Byte], Array[Byte]] { |
| def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { |
| new RocksDbRangeIterator(db.newIterator(readOptions), from, to) |
| } |
| |
| def close() = { |
| db.releaseSnapshot(readOptions.snapshot()) |
| } |
| } |
| } |
| |
| def flush(): Unit = ifOpen { |
| metrics.flushes.inc |
| trace("Flushing store: %s" format storeName) |
| db.flush(flushOptions) |
| trace("Flushed store: %s" format storeName) |
| } |
| |
| override def checkpoint(id: CheckpointId): Optional[Path] = { |
| val checkpoint = Checkpoint.create(db) |
| val checkpointPath = dir.getPath + "-" + id.toString |
| checkpoint.createCheckpoint(checkpointPath) |
| Optional.of(Paths.get(checkpointPath)) |
| } |
| |
| def close(): Unit = { |
| trace("Calling compact range.") |
| stateChangeLock.writeLock().lock() |
| |
| // if auto-compaction is disabled, e.g., when bulk-loading |
| if(options.disableAutoCompactions()) { |
| trace("Auto compaction is disabled, invoking compact range.") |
| db.compactRange() |
| } |
| |
| try { |
| trace("Closing.") |
| if (stackAtFirstClose == null) { // first close |
| stackAtFirstClose = new Exception() |
| db.close() |
| } else { |
| warn(new SamzaException("Close called again on a closed store: %s. Ignoring this close." + |
| "Stack at first close is under 'Caused By'." format storeName, stackAtFirstClose)) |
| } |
| } finally { |
| stateChangeLock.writeLock().unlock() |
| } |
| } |
| |
| private def ifOpen[T](fn: => T): T = { |
| stateChangeLock.readLock().lock() |
| try { |
| if (stackAtFirstClose == null) { |
| fn |
| } else { |
| throw new SamzaException("Attempted to access a closed store: %s. " + |
| "Stack at first close is under 'Caused By'." format storeName, stackAtFirstClose) |
| } |
| } finally { |
| stateChangeLock.readLock().unlock() |
| } |
| } |
| |
| class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { |
| private var open = true |
| |
| override def close() = ifOpen { |
| open = false |
| iter.close() |
| } |
| |
| def isOpen() = ifOpen { |
| open |
| } |
| |
| override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") |
| |
| override def hasNext() = ifOpen(iter.isValid) |
| |
| // The iterator is already pointing to the next element |
| protected def peekKey() = ifOpen { |
| getEntry().getKey |
| } |
| |
| protected def getEntry() = ifOpen { |
| val key = iter.key |
| val value = iter.value |
| new Entry(key, value) |
| } |
| |
| // By virtue of how RocksdbIterator is implemented, the implementation of |
| // our iterator is slightly different from standard java iterator next will |
| // always point to the current element, when next is called, we return the |
| // current element we are pointing to and advance the iterator to the next |
| // location (The new location may or may not be valid - this will surface |
| // when the next next() call is made, the isValid will fail) |
| override def next(): Entry[Array[Byte], Array[Byte]] = ifOpen { |
| if (!hasNext()) { |
| throw new NoSuchElementException |
| } |
| |
| val entry = getEntry() |
| iter.next() |
| metrics.bytesRead.inc(entry.getKey.length) |
| if (entry.getValue != null) { |
| metrics.bytesRead.inc(entry.getValue.length) |
| } |
| entry |
| } |
| |
| override def finalize(): Unit = ifOpen { |
| if (open) { |
| trace("Leaked reference to RocksDB iterator, forcing close.") |
| close() |
| } |
| } |
| } |
| |
| class RocksDbRangeIterator(iter: RocksIterator, from: Array[Byte], to: Array[Byte]) extends RocksDbIterator(iter) { |
| // RocksDB's JNI interface does not expose getters/setters that allow the |
| // comparator to be pluggable, and the default is lexicographic, so it's |
| // safe to just force lexicographic comparator here for now. |
| val comparator: LexicographicComparator = lexicographic |
| ifOpen(iter.seek(from)) |
| |
| override def hasNext() = ifOpen { |
| super.hasNext() && comparator.compare(peekKey(), to) < 0 |
| } |
| |
| def seek(key: Array[Byte]) = { |
| iter.seek(key) |
| } |
| } |
| |
| /** |
| * A comparator that applies a lexicographical comparison on byte arrays. |
| */ |
| class LexicographicComparator extends Comparator[Array[Byte]] { |
| def compare(k1: Array[Byte], k2: Array[Byte]): Int = { |
| val l = math.min(k1.length, k2.length) |
| var i = 0 |
| while (i < l) { |
| if (k1(i) != k2(i)) |
| return (k1(i) & 0xff) - (k2(i) & 0xff) |
| i += 1 |
| } |
| // okay prefixes are equal, the shorter array is less |
| k1.length - k2.length |
| } |
| } |
| } |