Make key-value stores generic
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 9c3d571..fa8d1cd 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -29,6 +29,7 @@
dependency('info.picocli:picocli:4.0.0-alpha-2')
dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
dependency('io.vertx:vertx-core:3.6.2')
+ dependency('javax.persistence:javax.persistence-api:2.2')
dependencySet(group: 'org.antlr', version: '4.7.1') {
entry 'antlr4'
entry 'antlr4-runtime'
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index b7c478e..9c4e30a 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -41,10 +41,10 @@
* @param blockchainIndex the blockchain index to index values
*/
(
- private val chainMetadata: KeyValueStore,
- private val blockBodyStore: KeyValueStore,
- private val blockHeaderStore: KeyValueStore,
- private val transactionReceiptsStore: KeyValueStore,
+ private val chainMetadata: KeyValueStore<Bytes, Bytes>,
+ private val blockBodyStore: KeyValueStore<Bytes, Bytes>,
+ private val blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+ private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
private val blockchainIndex: BlockchainIndex
) {
@@ -58,10 +58,10 @@
* @return a new blockchain repository made from the metadata passed in parameter.
*/
suspend fun init(
- blockBodyStore: KeyValueStore,
- blockHeaderStore: KeyValueStore,
- chainMetadata: KeyValueStore,
- transactionReceiptsStore: KeyValueStore,
+ blockBodyStore: KeyValueStore<Bytes, Bytes>,
+ blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+ chainMetadata: KeyValueStore<Bytes, Bytes>,
+ transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
blockchainIndex: BlockchainIndex,
genesisBlock: Block
): BlockchainRepository {
diff --git a/kv/build.gradle b/kv/build.gradle
index 0f20ad0..d05b5c2 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -19,6 +19,7 @@
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
+ compile 'javax.persistence:javax.persistence-api'
compileOnly 'com.jolbox:bonecp'
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
new file mode 100644
index 0000000..8b653ad
--- /dev/null
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.kv
+
+import kotlin.jvm.Throws
+import kotlinx.coroutines.Dispatchers
+
+import java.io.IOException
+import java.util.function.Function
+import java.util.function.Supplier
+import java.util.stream.Stream
+import javax.persistence.EntityManager
+import kotlin.coroutines.CoroutineContext
+
+class EntityManagerKeyValueStore<K, V>
+ @Throws(IOException::class)
+ constructor(
+ private val entityManagerProvider: () -> EntityManager,
+ private val entityClass: Class<V>,
+ private val idAccessor: (V) -> K,
+ override val coroutineContext: CoroutineContext = Dispatchers.IO
+ ) : KeyValueStore<K, V> {
+
+ companion object {
+ /**
+ * Open a relational database backed key-value store using a JPA entity manager.
+ *
+ * @param entityManagerProvider The supplier of entity manager to operate.
+ * @return A key-value store.
+ * @throws IOException If an I/O error occurs.
+ */
+ @JvmStatic
+ @Throws(IOException::class)
+ fun <K, V> open(
+ entityManagerProvider: Supplier<EntityManager>,
+ entityClass: Class<V>,
+ idAccessor: Function<V, K>
+ ) = EntityManagerKeyValueStore<K, V>(entityManagerProvider::get, entityClass, idAccessor::apply)
+ }
+
+ override suspend fun get(key: K): V? {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ return em.find(entityClass, key)
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
+ override suspend fun put(key: K, value: V) {
+ val em = entityManagerProvider()
+ em.transaction.begin()
+ try {
+ em.merge(value)
+ } finally {
+ em.transaction.commit()
+ em.close()
+ }
+ }
+
+ override suspend fun keys(): Iterable<K> {
+ val em = entityManagerProvider()
+ val query = em.createQuery(em.criteriaBuilder.createQuery(entityClass))
+ val resultStream: Stream<V> = query.resultStream
+ return Iterable { resultStream.map(idAccessor).iterator() }
+ }
+
+ override fun close() {
+ }
+}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 85a8b92..50e353f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -18,7 +18,7 @@
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.future.await
-import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
import org.infinispan.Cache
import kotlin.coroutines.CoroutineContext
@@ -26,10 +26,10 @@
* A key-value store backed by [Infinispan](https://infinispan.org)
*
*/
-class InfinispanKeyValueStore constructor(
- private val cache: Cache<Bytes, Bytes>,
+class InfinispanKeyValueStore<K, V> constructor(
+ private val cache: Cache<K, V>,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
@@ -40,16 +40,16 @@
* @return A key-value store.
*/
@JvmStatic
- fun open(cache: Cache<Bytes, Bytes>) = InfinispanKeyValueStore(cache)
+ fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
}
- override suspend fun get(key: Bytes): Bytes? = cache.getAsync(key).await()
+ override suspend fun get(key: K): V? = cache.getAsync(key).await()
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
cache.putAsync(key, value).await()
}
- override suspend fun keys(): Iterable<Bytes> = cache.keys
+ override suspend fun keys(): Iterable<K> = cache.keys
/**
* The cache is managed outside the scope of this key-value store.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 588eddf..7f2775a 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -17,7 +17,6 @@
package org.apache.tuweni.kv
import kotlinx.coroutines.CoroutineScope
-import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.AsyncResult
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
@@ -27,7 +26,7 @@
/**
* A key-value store.
*/
-interface KeyValueStore : Closeable, CoroutineScope {
+interface KeyValueStore<K, V> : Closeable, CoroutineScope {
/**
* Retrieves data from the store.
@@ -35,7 +34,7 @@
* @param key The key for the content.
* @return The stored data, or null if no data was stored under the specified key.
*/
- suspend fun get(key: Bytes): Bytes?
+ suspend fun get(key: K): V?
/**
* Retrieves data from the store.
@@ -44,7 +43,7 @@
* @return An [AsyncResult] that will complete with the stored content,
* or an empty optional if no content was available.
*/
- fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) }
+ fun getAsync(key: K): AsyncResult<V?> = asyncResult { get(key) }
/**
* Puts data into the store.
@@ -52,7 +51,7 @@
* @param key The key to associate with the data, for use when retrieving.
* @param value The data to store.
*/
- suspend fun put(key: Bytes, value: Bytes)
+ suspend fun put(key: K, value: V)
/**
* Puts data into the store.
@@ -64,19 +63,19 @@
* @param value The data to store.
* @return An [AsyncCompletion] that will complete when the content is stored.
*/
- fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) }
+ fun putAsync(key: K, value: V): AsyncCompletion = asyncCompletion { put(key, value) }
/**
* Provides an iterator over the keys of the store.
*
* @return An [Iterable] allowing to iterate over the set of keys.
*/
- suspend fun keys(): Iterable<Bytes>
+ suspend fun keys(): Iterable<K>
/**
* Provides an iterator over the keys of the store.
*
* @return An [Iterable] allowing to iterate over the set of keys.
*/
- fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() }
+ fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 21ba81d..f0e8985 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -25,49 +25,91 @@
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
* A key-value store backed by LevelDB.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the levelDB database.
* @param coroutineContext The co-routine context for blocking tasks.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
* @constructor Open a LevelDB-backed key-value store.
*/
-class LevelDBKeyValueStore
+class LevelDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
+
/**
* Open a LevelDB-backed key-value store.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = LevelDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ): LevelDBKeyValueStore<K, V> =
+ LevelDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a LevelDB-backed key-value store.
*
* @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the levelDB database.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path, options: Options) = LevelDBKeyValueStore(dbPath, options)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>,
+ options: Options
+ ) =
+ LevelDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply,
+ options)
}
private val db: DB
@@ -77,27 +119,28 @@
db = JniDBFactory.factory.open(dbPath.toFile(), options)
}
- override suspend fun get(key: Bytes): Bytes? {
- val rawValue = db[key.toArrayUnsafe()]
+ override suspend fun get(key: K): V? {
+ val rawValue = db[keySerializer(key).toArrayUnsafe()]
return if (rawValue == null) {
null
} else {
- Bytes.wrap(rawValue)
+ valueDeserializer(Bytes.wrap(rawValue))
}
}
- override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+ override suspend fun put(key: K, value: V) = db.put(keySerializer(key).toArrayUnsafe(),
+ valueSerializer(value).toArrayUnsafe())
- private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> {
+ private class KIterator<K>(val iter: DBIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
override fun hasNext(): Boolean = iter.hasNext()
- override fun next(): Bytes = Bytes.wrap(iter.next().key)
+ override fun next(): K = keyDeserializer(Bytes.wrap(iter.next().key))
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
val iter = db.iterator()
iter.seekToFirst()
- return Iterable { BytesIterator(iter) }
+ return Iterable { KIterator(iter, keyDeserializer) }
}
/**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index f5bf128..190108f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -32,29 +32,48 @@
* A key-value store backed by a MapDB instance.
*
* @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param coroutineContext The co-routine context for blocking tasks.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
* @constructor Open a MapDB-backed key-value store.
*/
-class MapDBKeyValueStore
+class MapDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes?) -> V?,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a MapDB-backed key-value store.
*
* @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = MapDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) =
+ MapDBKeyValueStore<K, V>(dbPath, keySerializer, valueSerializer, keyDeserializer, valueDeserializer)
}
private val db: DB
@@ -70,14 +89,14 @@
).createOrOpen()
}
- override suspend fun get(key: Bytes): Bytes? = storageData[key]
+ override suspend fun get(key: K): V? = valueDeserializer(storageData[keySerializer(key)])
- override suspend fun put(key: Bytes, value: Bytes) {
- storageData[key] = value
+ override suspend fun put(key: K, value: V) {
+ storageData[keySerializer(key)] = valueSerializer(value)
db.commit()
}
- override suspend fun keys(): Iterable<Bytes> = storageData.keys
+ override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
/**
* Closes the underlying MapDB instance.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index bc15460..5eaca56 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -17,7 +17,6 @@
package org.apache.tuweni.kv
import kotlinx.coroutines.Dispatchers
-import org.apache.tuweni.bytes.Bytes
import kotlin.coroutines.CoroutineContext
/**
@@ -27,11 +26,11 @@
* @return A key-value store.
* @constructor Open an in-memory key-value store.
*/
-class MapKeyValueStore
+class MapKeyValueStore<K, V>
constructor(
- private val map: MutableMap<Bytes, Bytes> = HashMap(),
+ private val map: MutableMap<K, V> = HashMap(),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
@@ -42,7 +41,7 @@
* @return A key-value store.
*/
@JvmStatic
- fun open(): MapKeyValueStore = MapKeyValueStore()
+ fun <K, V> open() = MapKeyValueStore<K, V>()
/**
* Open an in-memory key-value store.
@@ -51,16 +50,16 @@
* @return A key-value store.
*/
@JvmStatic
- fun open(map: MutableMap<Bytes, Bytes>) = MapKeyValueStore(map)
+ fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
}
- override suspend fun get(key: Bytes): Bytes? = map[key]
+ override suspend fun get(key: K): V? = map[key]
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
map[key] = value
}
- override suspend fun keys(): Iterable<Bytes> = map.keys
+ override suspend fun keys(): Iterable<K> = map.keys
/**
* Has no effect in this KeyValueStore implementation.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index 8e8717c..f22f0cd 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -24,58 +24,131 @@
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.future.await
import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
import java.net.InetAddress
import java.util.concurrent.CompletionStage
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
* A key-value store backed by Redis.
*
* @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
+ * @param coroutineContext the co-routine context in which this store executes
* @constructor Open a Redis-backed key-value store.
*/
-class RedisKeyValueStore(
+class RedisKeyValueStore<K, V>(
uri: String,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a Redis-backed key-value store.
*
* @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(uri: String) = RedisKeyValueStore(uri)
+ fun <K, V> open(
+ uri: String,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) = RedisKeyValueStore(uri,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param port The port for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(port: Int) = RedisKeyValueStore(port)
+ fun <K, V> open(
+ port: Int,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(port = port,
+ keySerializer = keySerializer::apply,
+ valueSerializer = valueSerializer::apply,
+ keyDeserializer = keyDeserializer::apply,
+ valueDeserializer = valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(address: InetAddress) = RedisKeyValueStore(6379, address)
+ fun <K, V> open(
+ address: InetAddress,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(6379,
+ address,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a Redis-backed key-value store.
*
* @param port The port for the Redis store.
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
*/
@JvmStatic
- fun open(port: Int, address: InetAddress) = RedisKeyValueStore(port, address)
+ fun <K, V> open(
+ port: Int,
+ address: InetAddress,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) =
+ RedisKeyValueStore(port,
+ address,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* A [RedisCodec] for working with Bytes classes.
@@ -94,12 +167,24 @@
*
* @param port The port for the Redis store.
* @param address The address for the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
*/
@JvmOverloads
constructor(
port: Int = 6379,
- address: InetAddress = InetAddress.getLoopbackAddress()
- ) : this(RedisURI.create(address.hostAddress, port).toURI().toString())
+ address: InetAddress = InetAddress.getLoopbackAddress(),
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes) -> V
+ ) : this(RedisURI.create(address.hostAddress, port).toURI().toString(),
+ keySerializer,
+ valueSerializer,
+ keyDeserializer,
+ valueDeserializer)
init {
val redisClient = RedisClient.create(uri)
@@ -107,14 +192,20 @@
asyncCommands = conn.async()
}
- override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()
+ override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
+ if (it == null) {
+ null
+ } else {
+ valueDeserializer(it)
+ }
+ }.await()
- override suspend fun put(key: Bytes, value: Bytes) {
- val future: CompletionStage<String> = asyncCommands.set(key, value)
+ override suspend fun put(key: K, value: V) {
+ val future: CompletionStage<String> = asyncCommands.set(keySerializer(key), valueSerializer(value))
future.await()
}
- override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await()
+ override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
override fun close() {
conn.close()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 7911803..30e8dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -25,6 +25,7 @@
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Function
import kotlin.coroutines.CoroutineContext
/**
@@ -37,37 +38,72 @@
* @throws IOException If an I/O error occurs.
* @constructor Open a RocksDB-backed key-value store.
*/
-class RocksDBKeyValueStore
+class RocksDBKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
dbPath: Path,
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes) -> V,
options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a RocksDB-backed key-value store.
*
* @param dbPath The path to the RocksDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path) = RocksDBKeyValueStore(dbPath)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>
+ ) = RocksDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply)
/**
* Open a RocksDB-backed key-value store.
*
* @param dbPath The path to the RocksDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @param options Options for the RocksDB database.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(dbPath: Path, options: Options) = RocksDBKeyValueStore(dbPath, options)
+ fun <K, V> open(
+ dbPath: Path,
+ keySerializer: Function<K, Bytes>,
+ valueSerializer: Function<V, Bytes>,
+ keyDeserializer: Function<Bytes, K>,
+ valueDeserializer: Function<Bytes, V>,
+ options: Options
+ ) =
+ RocksDBKeyValueStore(dbPath,
+ keySerializer::apply,
+ valueSerializer::apply,
+ keyDeserializer::apply,
+ valueDeserializer::apply,
+ options)
}
private val db: RocksDB
@@ -79,43 +115,43 @@
db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
}
- override suspend fun get(key: Bytes): Bytes? {
+ override suspend fun get(key: K): V? {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
- val rawValue = db[key.toArrayUnsafe()]
+ val rawValue = db[keySerializer(key).toArrayUnsafe()]
return if (rawValue == null) {
null
} else {
- Bytes.wrap(rawValue)
+ valueDeserializer(Bytes.wrap(rawValue))
}
}
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
- db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+ db.put(keySerializer(key).toArrayUnsafe(), valueSerializer(value).toArrayUnsafe())
}
- private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> {
+ private class BytesIterator<K>(val rIterator: RocksIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
override fun hasNext(): Boolean = rIterator.isValid
- override fun next(): Bytes {
- val key = Bytes.wrap(rIterator.key())
+ override fun next(): K {
+ val key = rIterator.key()
rIterator.next()
- return key
+ return keyDeserializer(Bytes.wrap(key))
}
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
if (closed.get()) {
throw IllegalStateException("Closed DB")
}
val iter = db.newIterator()
iter.seekToFirst()
- return Iterable { BytesIterator(iter) }
+ return Iterable { BytesIterator(iter, keyDeserializer) }
}
/**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index aa07336..46c6e93 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -38,27 +38,45 @@
* @throws IOException If an I/O error occurs.
* @constructor Open a relational database backed key-value store.
*/
-class SQLKeyValueStore
+class SQLKeyValueStore<K, V>
@Throws(IOException::class)
constructor(
jdbcurl: String,
val tableName: String = "store",
val keyColumn: String = "key",
val valueColumn: String = "value",
+ private val keySerializer: (K) -> Bytes,
+ private val valueSerializer: (V) -> Bytes,
+ private val keyDeserializer: (Bytes) -> K,
+ private val valueDeserializer: (Bytes?) -> V?,
override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
companion object {
/**
* Open a relational database backed key-value store.
*
* @param jdbcUrl The JDBC url to connect to the database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl)
+ fun <K, V> open(
+ jdbcUrl: String,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) = SQLKeyValueStore<K, V>(jdbcurl = jdbcUrl,
+ keySerializer = keySerializer,
+ valueSerializer = valueSerializer,
+ keyDeserializer = keyDeserializer,
+ valueDeserializer = valueDeserializer)
/**
* Open a relational database backed key-value store.
@@ -67,13 +85,33 @@
* @param tableName the name of the table to use for storage.
* @param keyColumn the key column of the store.
* @param valueColumn the value column of the store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
- fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) =
- SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn)
+ fun <K, V> open(
+ jdbcUrl: String,
+ tableName: String,
+ keyColumn: String,
+ valueColumn: String,
+ keySerializer: (K) -> Bytes,
+ valueSerializer: (V) -> Bytes,
+ keyDeserializer: (Bytes) -> K,
+ valueDeserializer: (Bytes?) -> V?
+ ) =
+ SQLKeyValueStore<K, V>(jdbcUrl,
+ tableName,
+ keyColumn,
+ valueColumn,
+ keySerializer,
+ valueSerializer,
+ keyDeserializer,
+ valueDeserializer)
}
private val connectionPool: BoneCP
@@ -85,34 +123,34 @@
connectionPool = BoneCP(config)
}
- override suspend fun get(key: Bytes): Bytes? {
+ override suspend fun get(key: K): V? {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
- stmt.setBytes(1, key.toArrayUnsafe())
+ stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
stmt.execute()
val rs = stmt.resultSet
return if (rs.next()) {
- Bytes.wrap(rs.getBytes(1))
+ valueDeserializer(Bytes.wrap(rs.getBytes(1)))
} else {
null
}
}
}
- override suspend fun put(key: Bytes, value: Bytes) {
+ override suspend fun put(key: K, value: V) {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
- stmt.setBytes(1, key.toArrayUnsafe())
- stmt.setBytes(2, value.toArrayUnsafe())
+ stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+ stmt.setBytes(2, valueSerializer(value).toArrayUnsafe())
it.autoCommit = false
try {
stmt.execute()
} catch (e: SQLException) {
val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?")
- updateStmt.setBytes(1, value.toArrayUnsafe())
- updateStmt.setBytes(2, key.toArrayUnsafe())
+ updateStmt.setBytes(1, valueSerializer(value).toArrayUnsafe())
+ updateStmt.setBytes(2, keySerializer(key).toArrayUnsafe())
updateStmt.execute()
}
it.commit()
@@ -120,24 +158,24 @@
}
}
- private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> {
+ private class SQLIterator<K>(val resultSet: ResultSet, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
private var next = resultSet.next()
override fun hasNext(): Boolean = next
- override fun next(): Bytes {
- val key = Bytes.wrap(resultSet.getBytes(1))
+ override fun next(): K {
+ val key = keyDeserializer(Bytes.wrap(resultSet.getBytes(1)))
next = resultSet.next()
return key
}
}
- override suspend fun keys(): Iterable<Bytes> {
+ override suspend fun keys(): Iterable<K> {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName")
stmt.execute()
- return Iterable { SQLIterator(stmt.resultSet) }
+ return Iterable { SQLIterator(stmt.resultSet, keyDeserializer) }
}
}
diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
index b5d8c2a..19e0f37 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
@@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +35,12 @@
@ExtendWith(TempDirectoryExtension.class)
class KeyValueStoreTest {
+ private static final Function<Bytes, Bytes> bytesIdentityFn = Function.identity();
+
@Test
void testPutAndGet() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,14 +52,14 @@
@Test
void testNoValue() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
assertNull(store.getAsync(Bytes.of(123)).get());
}
@Test
void testKeys() throws Exception {
Map<Bytes, Bytes> map = new HashMap<>();
- KeyValueStore store = MapKeyValueStore.open(map);
+ KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Set<Bytes> keys = new HashSet<>();
@@ -66,7 +69,12 @@
@Test
void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
- try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+ try (LevelDBKeyValueStore<Bytes, Bytes> leveldb = LevelDBKeyValueStore.open(
+ tempDirectory.resolve("foo").resolve("bar"),
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn)) {
AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = leveldb.getAsync(Bytes.of(123)).get();
@@ -77,7 +85,12 @@
@Test
void testRocksDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
- try (RocksDBKeyValueStore rocksdb = RocksDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+ try (RocksDBKeyValueStore<Bytes, Bytes> rocksdb = RocksDBKeyValueStore.open(
+ tempDirectory.resolve("foo").resolve("bar"),
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn,
+ bytesIdentityFn)) {
AsyncCompletion completion = rocksdb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = rocksdb.getAsync(Bytes.of(123)).get();
diff --git a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
index c218a65..db44ae1 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
@@ -22,6 +22,7 @@
import org.apache.tuweni.junit.RedisServerExtension;
import java.net.InetAddress;
+import java.util.function.Function;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
@@ -34,7 +35,8 @@
@Test
void testPutAndGet(@RedisPort Integer redisPort) throws Exception {
- KeyValueStore store = RedisKeyValueStore.open(redisPort);
+ KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore
+ .open(redisPort, Function.identity(), Function.identity(), Function.identity(), Function.identity());
AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
completion.join();
Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,13 +51,24 @@
@Test
void testNoValue(@RedisPort Integer redisPort) throws Exception {
- KeyValueStore store = RedisKeyValueStore.open(redisPort, InetAddress.getLoopbackAddress());
+ KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore.open(
+ redisPort,
+ InetAddress.getLoopbackAddress(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity());
assertNull(store.getAsync(Bytes.of(124)).get());
}
@Test
void testRedisCloseable(@RedisPort Integer redisPort) throws Exception {
- try (RedisKeyValueStore redis = RedisKeyValueStore.open("redis://127.0.0.1:" + redisPort)) {
+ try (RedisKeyValueStore<Bytes, Bytes> redis = RedisKeyValueStore.open(
+ "redis://127.0.0.1:" + redisPort,
+ Function.identity(),
+ Function.identity(),
+ Function.identity(),
+ Function.identity())) {
AsyncCompletion completion = redis.putAsync(Bytes.of(125), Bytes.of(10, 12, 13));
completion.join();
Bytes value = redis.getAsync(Bytes.of(125)).get();
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ae8c62b..94f4ffb 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -110,7 +110,7 @@
runBlocking {
kv.put(bar, foo)
kv.put(foo, bar)
- val keys = (kv.keys().map { it })
+ val keys = (kv.keys().map { it })
keys.should.contain(bar)
keys.should.contain(foo)
}
@@ -120,7 +120,12 @@
object MapDBKeyValueStoreSpec : Spek({
val testDir = Files.createTempDirectory("data")
- val kv = MapDBKeyValueStore(testDir.resolve("data.db"))
+ val kv = MapDBKeyValueStore(
+ testDir.resolve("data.db"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
describe("a MapDB-backed key value store") {
@@ -141,12 +146,19 @@
runBlocking {
kv.put(foobar, foo)
kv.put(foo, bar)
- kv.keys().should.equal(setOf(foobar, foo))
+ val keys = kv.keys().map { it }
+ keys.should.contain(foo)
+ keys.should.contain(foobar)
}
}
it("should not allow usage after the DB is closed") {
- val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db"))
+ val kv2 = MapDBKeyValueStore(
+ testDir.resolve("data2.db"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -168,7 +180,12 @@
object LevelDBKeyValueStoreSpec : Spek({
val path = Files.createTempDirectory("leveldb")
- val kv = LevelDBKeyValueStore(path)
+ val kv = LevelDBKeyValueStore(
+ path,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -197,7 +214,12 @@
}
it("should not allow usage after the DB is closed") {
- val kv2 = LevelDBKeyValueStore(path.resolve("subdb"))
+ val kv2 = LevelDBKeyValueStore(
+ path.resolve("subdb"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -214,7 +236,12 @@
object RocksDBKeyValueStoreSpec : Spek({
val path = Files.createTempDirectory("rocksdb")
- val kv = RocksDBKeyValueStore(path)
+ val kv = RocksDBKeyValueStore(
+ path,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -245,7 +272,12 @@
}
it("should not allow usage after the DB is closed") {
- val kv2 = RocksDBKeyValueStore(path.resolve("subdb"))
+ val kv2 = RocksDBKeyValueStore(
+ path.resolve("subdb"),
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false
@@ -269,8 +301,21 @@
st.executeUpdate("create table store(key binary, value binary, primary key(key))")
st.executeUpdate("create table store2(id binary, val binary, primary key(id))")
}
- val kv = SQLKeyValueStore(jdbcUrl)
- val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val")
+ val kv = SQLKeyValueStore(
+ jdbcUrl,
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
+ val otherkv = SQLKeyValueStore.open(
+ jdbcUrl,
+ "store2",
+ "id",
+ "val",
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
afterGroup {
kv.close()
otherkv.close()
@@ -316,7 +361,12 @@
}
it("should not allow usage after the DB is closed") {
- val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
+ val kv2 = SQLKeyValueStore(
+ "jdbc:h2:mem:testdb",
+ keySerializer = { it },
+ valueSerializer = { it },
+ keyDeserializer = { it },
+ valueDeserializer = { it })
kv2.close()
runBlocking {
var caught = false