blob: ced4cf58341154302b290a309b8008be65c836d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv
import java.nio.ByteBuffer
import org.iq80.leveldb._
import org.fusesource.leveldbjni.internal.NativeComparator
import org.fusesource.leveldbjni.JniDBFactory._
import java.io._
import java.util.Iterator
import java.lang.Iterable
import org.apache.samza.config.Config
import org.apache.samza.container.SamzaContainerContext
import grizzled.slf4j.{ Logger, Logging }
object LevelDbKeyValueStore {
private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
val options = new Options
// Cache size and write buffer size are specified on a per-container basis.
options.cacheSize(cacheSize / containerContext.partitions.size)
options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
options.compressionType(
storeConfig.get("leveldb.compression", "snappy") match {
case "snappy" => CompressionType.SNAPPY
case "none" => CompressionType.NONE
case _ =>
logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
CompressionType.SNAPPY
})
options.createIfMissing(true)
options.errorIfExists(true)
options
}
}
class LevelDbKeyValueStore(
val dir: File,
val options: Options,
/**
* How many deletes must occur before we will force a compaction. This is to
* get around performance issues discovered in SAMZA-254. A value of -1
* disables this feature.
*/
val deleteCompactionThreshold: Int = -1,
val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
private lazy val db = factory.open(dir, options)
private val lexicographic = new LexicographicComparator()
private var deletesSinceLastCompaction = 0
def get(key: Array[Byte]): Array[Byte] = {
maybeCompact
metrics.gets.inc
require(key != null, "Null key not allowed.")
val found = db.get(key)
if (found != null) {
metrics.bytesRead.inc(found.size)
}
found
}
def put(key: Array[Byte], value: Array[Byte]) {
metrics.puts.inc
require(key != null, "Null key not allowed.")
if (value == null) {
db.delete(key)
deletesSinceLastCompaction += 1
} else {
metrics.bytesWritten.inc(key.size + value.size)
db.put(key, value)
}
}
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
val batch = db.createWriteBatch()
val iter = entries.iterator
var wrote = 0
var deletes = 0
while (iter.hasNext) {
wrote += 1
val curr = iter.next()
if (curr.getValue == null) {
deletes += 1
batch.delete(curr.getKey)
} else {
val key = curr.getKey
val value = curr.getValue
metrics.bytesWritten.inc(key.size + value.size)
batch.put(key, value)
}
}
db.write(batch)
batch.close
metrics.puts.inc(wrote)
metrics.deletes.inc(deletes)
deletesSinceLastCompaction += deletes
}
def delete(key: Array[Byte]) {
metrics.deletes.inc
put(key, null)
}
def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
maybeCompact
metrics.ranges.inc
require(from != null && to != null, "Null bound not allowed.")
new LevelDbRangeIterator(db.iterator, from, to)
}
def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
maybeCompact
metrics.alls.inc
val iter = db.iterator()
iter.seekToFirst()
new LevelDbIterator(iter)
}
/**
* Trigger a complete compaction on the LevelDB store if there have been at
* least deleteCompactionThreshold deletes since the last compaction.
*/
def maybeCompact = {
if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
compact
}
}
/**
* Trigger a complete compaction of the LevelDB store.
*/
def compact {
// According to LevelDB's docs:
// begin==NULL is treated as a key before all keys in the database.
// end==NULL is treated as a key after all keys in the database.
db.compactRange(null, null)
deletesSinceLastCompaction = 0
}
def flush {
metrics.flushes.inc
// TODO can't find a flush for leveldb
trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
}
def close() {
trace("Closing.")
db.close()
}
class LevelDbIterator(iter: DBIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
private var open = true
def close() = {
open = false
iter.close()
}
def remove() = iter.remove()
def hasNext() = iter.hasNext()
def next() = {
if (!hasNext()) {
throw new NoSuchElementException
}
val curr = iter.next
val key = curr.getKey
val value = curr.getValue
metrics.bytesRead.inc(key.size)
if (value != null) {
metrics.bytesRead.inc(value.size)
}
new Entry(key, value)
}
override def finalize() {
if (open) {
System.err.println("Leaked reference to level db iterator, forcing close.")
close()
}
}
}
class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: Array[Byte]) extends LevelDbIterator(iter) {
val comparator = if (options.comparator == null) lexicographic else options.comparator
iter.seek(from)
override def hasNext() = {
iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) < 0
}
}
/**
* Compare two array lexicographically using unsigned byte arithmetic
*/
class LexicographicComparator extends DBComparator {
def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
val l = math.min(k1.length, k2.length)
var i = 0
while (i < l) {
if (k1(i) != k2(i))
return (k1(i) & 0xff) - (k2(i) & 0xff)
i += 1
}
// okay prefixes are equal, the shorter array is less
k1.length - k2.length
}
def name(): String = "lexicographic"
def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
def findShortSuccessor(key: Array[Byte]) = key
}
}