blob: 9b0872d06cc4c30a9759000014bfc6b2aad6ce0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv
import java.io.File
import java.util.Arrays
import java.util.Random
import scala.collection.JavaConversions._
import org.apache.samza.serializers.IntegerSerde
import org.iq80.leveldb.Options
import org.junit.After
import org.junit.Assert._
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.apache.samza.serializers.StringSerde
import org.apache.samza.util.TestUtil._
import org.apache.samza.serializers.Serde
import org.scalatest.Assertions.intercept
@RunWith(value = classOf[Parameterized])
class TestKeyValueStores(typeOfStore: String) {
import TestKeyValueStores._
val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + new Random().nextInt(Int.MaxValue))
var store: KeyValueStore[Array[Byte], Array[Byte]] = null
var cache = false
var serde = false
@Before
def setup() {
dir.mkdirs()
val leveldb = new LevelDbKeyValueStore(dir, new Options)
val passThroughSerde = new Serde[Array[Byte]] {
def toBytes(obj: Array[Byte]) = obj
def fromBytes(bytes: Array[Byte]) = bytes
}
store = if ("cache".equals(typeOfStore)) {
cache = true
new CachedStore(leveldb, CacheSize, BatchSize)
} else if ("serde".equals(typeOfStore)) {
serde = true
new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
} else if ("cache-and-serde".equals(typeOfStore)) {
val serializedStore = new SerializedKeyValueStore(leveldb, passThroughSerde, passThroughSerde)
serde = true
cache = true
new CachedStore(serializedStore, CacheSize, BatchSize)
} else {
leveldb
}
store = new NullSafeKeyValueStore(store)
}
@After
def teardown() {
store.close
for (file <- dir.listFiles)
file.delete()
dir.delete()
}
@Test
def getNonExistantIsNull() {
assertNull(store.get(b("hello")))
}
@Test
def putAndGet() {
store.put(b("k"), b("v"))
assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
}
@Test
def doublePutAndGet() {
val k = b("k2")
store.put(k, b("v1"))
store.put(k, b("v2"))
store.put(k, b("v3"))
assertTrue(Arrays.equals(b("v3"), store.get(k)))
}
@Test
def testNullsWithSerde() {
if (serde) {
val a = b("a")
val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
expect(classOf[NullPointerException], keyMsg) { store.get(null) }
expect(classOf[NullPointerException], keyMsg) { store.delete(null) }
expect(classOf[NullPointerException], keyMsg) { store.put(null, a) }
expect(classOf[NullPointerException], valMsg) { store.put(a, null) }
expect(classOf[NullPointerException], valMsg) { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
expect(classOf[NullPointerException], keyMsg) { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
expect(classOf[NullPointerException], keyMsg) { store.range(a, null) }
expect(classOf[NullPointerException], keyMsg) { store.range(null, a) }
}
}
@Test
def testPutAll() {
// Use CacheSize - 1 so we fully fill the cache, but don't write any data
// out. Our check (below) uses == for cached entries, and using
// numEntires >= CacheSize would result in the LRU cache dropping some
// entries. The result would be that we get the correct byte array back
// from the cache's underlying store (leveldb), but that == would fail.
val numEntries = CacheSize - 1
val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
store.putAll(entries)
if (cache) {
assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
} else {
assertTrue("All values should be found.", entries.forall(e => Arrays.equals(store.get(e.getKey), e.getValue)))
}
}
@Test
def testIterateAll() {
for (letter <- letters)
store.put(b(letter.toString), b(letter.toString))
val iter = store.all
checkRange(letters, iter)
iter.close()
}
@Test
def testRange() {
val from = 5
val to = 20
for (letter <- letters)
store.put(b(letter.toString), b(letter.toString))
val iter = store.range(b(letters(from)), b(letters(to)))
checkRange(letters.slice(from, to), iter)
iter.close()
}
@Test
def testDelete() {
val a = b("a")
assertNull(store.get(a))
store.put(a, a)
assertTrue(Arrays.equals(a, store.get(a)))
store.delete(a)
assertNull(store.get(a))
}
@Test
def testSimpleScenario() {
val vals = letters.map(b(_))
for (v <- vals) {
assertNull(store.get(v))
store.put(v, v)
assertTrue(Arrays.equals(v, store.get(v)))
}
vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
vals.foreach(v => store.delete(v))
vals.foreach(v => assertNull(store.get(v)))
}
/**
* This test specifically targets an issue in Scala 2.8.1's DoubleLinkedList
* implementation. The issue is that it doesn't work. More specifically,
* creating a DoubleLinkedList from an existing list does not update the
* "prev" field of the existing list's head to point to the new head. As a
* result, in Scala 2.8.1, every DoulbeLinkedList node's prev field is null.
* Samza gets around this by manually updating the field itself. See SAMZA-80
* for details.
*
* This issue is exposed in Samza's KV cache implementation, which uses
* DoubleLinkedList, so all comments in this method are discussing the cached
* implementation, but the test is still useful as a sanity check for
* non-cached stores.
*/
@Test
def testBrokenScalaDoubleLinkedList() {
val something = b("")
val keys = letters
.map(b(_))
.toArray
// Load the cache to capacity.
letters
.slice(0, TestKeyValueStores.CacheSize)
.map(b(_))
.foreach(store.put(_, something))
// Now keep everything in the cache, but with an empty dirty list.
store.flush
// Dirty list is now empty, and every CacheEntry has dirty=null.
// Corrupt the dirty list by creating two dirty lists that toggle back and
// forth depending on whether the last dirty write was to 1 or 0. The trick
// here is that every element in the cache is treated as the "head" of the
// DoulbeLinkedList (prev==null), even though it's not necessarily. Thus,
// You can end up with multiple nodes each having their own version of the
// dirty list with different elements in them.
store.put(keys(1), something)
store.put(keys(0), something)
store.put(keys(1), something)
store.flush
// The dirty list is now empty, but 0's dirty field actually has 0 and 1.
store.put(keys(0), something)
// The dirty list now has 0 and 1, but 1's dirty field is null in the
// cache because it was just flushed.
// Get rid of 1 from the cache by reading every other element, and then
// putting one new element.
letters
.slice(2, TestKeyValueStores.CacheSize)
.map(b(_))
.foreach(store.get(_))
store.put(keys(TestKeyValueStores.CacheSize), something)
// Now try and trigger an NPE since the dirty list has an element (1)
// that's no longer in the cache.
store.flush
}
/**
* A little test that tries to simulate a few common patterns:
* read-modify-write, and do-some-stuff-then-delete (windowing).
*/
@Test
def testRandomReadWriteRemove() {
// Make test deterministic by seeding the random number generator.
val rand = new Random(12345)
val keys = letters
.map(b(_))
.toArray
// Map from letter to key byte array used for letter, and expected value.
// We have to go through some acrobatics here since Java's byte array uses
// object identity for .equals. Two byte arrays can have identical byte
// elements, but not be equal.
var expected = Map[String, (Array[Byte], String)]()
(0 until 100).foreach(loop => {
(0 until 30).foreach(i => {
val idx = rand.nextInt(keys.length)
val randomValue = letters(rand.nextInt(keys.length))
val key = keys(idx)
val currentVal = store.get(key)
store.put(key, b(randomValue))
expected += letters(idx) -> (key, randomValue)
})
for ((k, v) <- expected) {
val bytes = store.get(v._1)
assertNotNull(bytes)
assertEquals(v._2, new String(bytes, "UTF-8"))
}
val iterator = store.all
while (iterator.hasNext) {
val key = iterator.next.getKey
store.delete(key)
expected -= new String(key, "UTF-8")
}
iterator.close
assertEquals(0, expected.size)
})
}
def checkRange(vals: IndexedSeq[String], iter: KeyValueIterator[Array[Byte], Array[Byte]]) {
for (v <- vals) {
assertTrue(iter.hasNext)
val entry = iter.next()
assertEquals(v, s(entry.getKey))
assertEquals(v, s(entry.getValue))
}
assertFalse(iter.hasNext)
intercept[NoSuchElementException] { iter.next() }
}
/**
* Convert string to byte buffer
*/
def b(s: String) =
s.getBytes
/**
* Convert byte buffer to string
*/
def s(b: Array[Byte]) =
new String(b)
}
object TestKeyValueStores {
val CacheSize = 10
val BatchSize = 5
@Parameters
def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("cache"), Array("serde"), Array("cache-and-serde"), Array("leveldb"))
}