Make key-value stores generic
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index 9c3d571..fa8d1cd 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -29,6 +29,7 @@
     dependency('info.picocli:picocli:4.0.0-alpha-2')
     dependency('io.lettuce:lettuce-core:5.1.3.RELEASE')
     dependency('io.vertx:vertx-core:3.6.2')
+    dependency('javax.persistence:javax.persistence-api:2.2')
     dependencySet(group: 'org.antlr', version: '4.7.1') {
       entry 'antlr4'
       entry 'antlr4-runtime'
diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
index b7c478e..9c4e30a 100644
--- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
+++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt
@@ -41,10 +41,10 @@
  * @param blockchainIndex the blockchain index to index values
  */
   (
-    private val chainMetadata: KeyValueStore,
-    private val blockBodyStore: KeyValueStore,
-    private val blockHeaderStore: KeyValueStore,
-    private val transactionReceiptsStore: KeyValueStore,
+    private val chainMetadata: KeyValueStore<Bytes, Bytes>,
+    private val blockBodyStore: KeyValueStore<Bytes, Bytes>,
+    private val blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+    private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
     private val blockchainIndex: BlockchainIndex
   ) {
 
@@ -58,10 +58,10 @@
      * @return a new blockchain repository made from the metadata passed in parameter.
      */
     suspend fun init(
-      blockBodyStore: KeyValueStore,
-      blockHeaderStore: KeyValueStore,
-      chainMetadata: KeyValueStore,
-      transactionReceiptsStore: KeyValueStore,
+      blockBodyStore: KeyValueStore<Bytes, Bytes>,
+      blockHeaderStore: KeyValueStore<Bytes, Bytes>,
+      chainMetadata: KeyValueStore<Bytes, Bytes>,
+      transactionReceiptsStore: KeyValueStore<Bytes, Bytes>,
       blockchainIndex: BlockchainIndex,
       genesisBlock: Block
     ): BlockchainRepository {
diff --git a/kv/build.gradle b/kv/build.gradle
index 0f20ad0..d05b5c2 100644
--- a/kv/build.gradle
+++ b/kv/build.gradle
@@ -19,6 +19,7 @@
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
   compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
   compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
+  compile 'javax.persistence:javax.persistence-api'
   compileOnly 'com.jolbox:bonecp'
   compileOnly 'io.lettuce:lettuce-core'
   compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
new file mode 100644
index 0000000..8b653ad
--- /dev/null
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.kv
+
+import kotlin.jvm.Throws
+import kotlinx.coroutines.Dispatchers
+
+import java.io.IOException
+import java.util.function.Function
+import java.util.function.Supplier
+import java.util.stream.Stream
+import javax.persistence.EntityManager
+import kotlin.coroutines.CoroutineContext
+
+class EntityManagerKeyValueStore<K, V>
+        @Throws(IOException::class)
+        constructor(
+          private val entityManagerProvider: () -> EntityManager,
+          private val entityClass: Class<V>,
+          private val idAccessor: (V) -> K,
+          override val coroutineContext: CoroutineContext = Dispatchers.IO
+        ) : KeyValueStore<K, V> {
+
+  companion object {
+    /**
+     * Open a relational database backed key-value store using a JPA entity manager.
+     *
+     * @param entityManagerProvider The supplier of entity manager to operate.
+     * @return A key-value store.
+     * @throws IOException If an I/O error occurs.
+     */
+    @JvmStatic
+    @Throws(IOException::class)
+    fun <K, V> open(
+      entityManagerProvider: Supplier<EntityManager>,
+      entityClass: Class<V>,
+      idAccessor: Function<V, K>
+    ) = EntityManagerKeyValueStore<K, V>(entityManagerProvider::get, entityClass, idAccessor::apply)
+  }
+
+  override suspend fun get(key: K): V? {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      return em.find(entityClass, key)
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
+  override suspend fun put(key: K, value: V) {
+    val em = entityManagerProvider()
+    em.transaction.begin()
+    try {
+      em.merge(value)
+    } finally {
+      em.transaction.commit()
+      em.close()
+    }
+  }
+
+  override suspend fun keys(): Iterable<K> {
+    val em = entityManagerProvider()
+    val query = em.createQuery(em.criteriaBuilder.createQuery(entityClass))
+    val resultStream: Stream<V> = query.resultStream
+    return Iterable { resultStream.map(idAccessor).iterator() }
+  }
+
+  override fun close() {
+  }
+}
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
index 85a8b92..50e353f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt
@@ -18,7 +18,7 @@
 
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
-import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
 import org.infinispan.Cache
 import kotlin.coroutines.CoroutineContext
 
@@ -26,10 +26,10 @@
  * A key-value store backed by [Infinispan](https://infinispan.org)
  *
  */
-class InfinispanKeyValueStore constructor(
-  private val cache: Cache<Bytes, Bytes>,
+class InfinispanKeyValueStore<K, V> constructor(
+  private val cache: Cache<K, V>,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
 
@@ -40,16 +40,16 @@
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(cache: Cache<Bytes, Bytes>) = InfinispanKeyValueStore(cache)
+    fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = cache.getAsync(key).await()
+  override suspend fun get(key: K): V? = cache.getAsync(key).await()
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     cache.putAsync(key, value).await()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = cache.keys
+  override suspend fun keys(): Iterable<K> = cache.keys
 
   /**
    * The cache is managed outside the scope of this key-value store.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
index 588eddf..7f2775a 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt
@@ -17,7 +17,6 @@
 package org.apache.tuweni.kv
 
 import kotlinx.coroutines.CoroutineScope
-import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.AsyncResult
 import org.apache.tuweni.concurrent.coroutines.asyncCompletion
@@ -27,7 +26,7 @@
 /**
  * A key-value store.
  */
-interface KeyValueStore : Closeable, CoroutineScope {
+interface KeyValueStore<K, V> : Closeable, CoroutineScope {
 
   /**
    * Retrieves data from the store.
@@ -35,7 +34,7 @@
    * @param key The key for the content.
    * @return The stored data, or null if no data was stored under the specified key.
    */
-  suspend fun get(key: Bytes): Bytes?
+  suspend fun get(key: K): V?
 
   /**
    * Retrieves data from the store.
@@ -44,7 +43,7 @@
    * @return An [AsyncResult] that will complete with the stored content,
    *         or an empty optional if no content was available.
    */
-  fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) }
+  fun getAsync(key: K): AsyncResult<V?> = asyncResult { get(key) }
 
   /**
    * Puts data into the store.
@@ -52,7 +51,7 @@
    * @param key The key to associate with the data, for use when retrieving.
    * @param value The data to store.
    */
-  suspend fun put(key: Bytes, value: Bytes)
+  suspend fun put(key: K, value: V)
 
   /**
    * Puts data into the store.
@@ -64,19 +63,19 @@
    * @param value The data to store.
    * @return An [AsyncCompletion] that will complete when the content is stored.
    */
-  fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) }
+  fun putAsync(key: K, value: V): AsyncCompletion = asyncCompletion { put(key, value) }
 
   /**
    * Provides an iterator over the keys of the store.
    *
    * @return An [Iterable] allowing to iterate over the set of keys.
    */
-  suspend fun keys(): Iterable<Bytes>
+  suspend fun keys(): Iterable<K>
 
   /**
    * Provides an iterator over the keys of the store.
    *
    * @return An [Iterable] allowing to iterate over the set of keys.
    */
-  fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() }
+  fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() }
 }
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
index 21ba81d..f0e8985 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt
@@ -25,49 +25,91 @@
 import java.io.IOException
 import java.nio.file.Files
 import java.nio.file.Path
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by LevelDB.
  *
  * @param dbPath The path to the levelDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
  * @param options Options for the levelDB database.
  * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a LevelDB-backed key-value store.
  */
-class LevelDBKeyValueStore
+class LevelDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
+
     /**
      * Open a LevelDB-backed key-value store.
      *
      * @param dbPath The path to the levelDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = LevelDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ): LevelDBKeyValueStore<K, V> =
+      LevelDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * Open a LevelDB-backed key-value store.
      *
      * @param dbPath The path to the levelDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @param options Options for the levelDB database.
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path, options: Options) = LevelDBKeyValueStore(dbPath, options)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>,
+      options: Options
+    ) =
+      LevelDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply,
+        options)
   }
 
   private val db: DB
@@ -77,27 +119,28 @@
     db = JniDBFactory.factory.open(dbPath.toFile(), options)
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
-    val rawValue = db[key.toArrayUnsafe()]
+  override suspend fun get(key: K): V? {
+    val rawValue = db[keySerializer(key).toArrayUnsafe()]
     return if (rawValue == null) {
       null
     } else {
-      Bytes.wrap(rawValue)
+      valueDeserializer(Bytes.wrap(rawValue))
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+  override suspend fun put(key: K, value: V) = db.put(keySerializer(key).toArrayUnsafe(),
+    valueSerializer(value).toArrayUnsafe())
 
-  private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> {
+  private class KIterator<K>(val iter: DBIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
     override fun hasNext(): Boolean = iter.hasNext()
 
-    override fun next(): Bytes = Bytes.wrap(iter.next().key)
+    override fun next(): K = keyDeserializer(Bytes.wrap(iter.next().key))
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     val iter = db.iterator()
     iter.seekToFirst()
-    return Iterable { BytesIterator(iter) }
+    return Iterable { KIterator(iter, keyDeserializer) }
   }
 
   /**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
index f5bf128..190108f 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt
@@ -32,29 +32,48 @@
  * A key-value store backed by a MapDB instance.
  *
  * @param dbPath The path to the MapDB database.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
  * @param coroutineContext The co-routine context for blocking tasks.
  * @return A key-value store.
  * @throws IOException If an I/O error occurs.
  * @constructor Open a MapDB-backed key-value store.
  */
-class MapDBKeyValueStore
+class MapDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes?) -> V?,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a MapDB-backed key-value store.
      *
      * @param dbPath The path to the MapDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = MapDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) =
+      MapDBKeyValueStore<K, V>(dbPath, keySerializer, valueSerializer, keyDeserializer, valueDeserializer)
   }
 
   private val db: DB
@@ -70,14 +89,14 @@
     ).createOrOpen()
   }
 
-  override suspend fun get(key: Bytes): Bytes? = storageData[key]
+  override suspend fun get(key: K): V? = valueDeserializer(storageData[keySerializer(key)])
 
-  override suspend fun put(key: Bytes, value: Bytes) {
-    storageData[key] = value
+  override suspend fun put(key: K, value: V) {
+    storageData[keySerializer(key)] = valueSerializer(value)
     db.commit()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = storageData.keys
+  override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer)
 
   /**
    * Closes the underlying MapDB instance.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
index bc15460..5eaca56 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt
@@ -17,7 +17,6 @@
 package org.apache.tuweni.kv
 
 import kotlinx.coroutines.Dispatchers
-import org.apache.tuweni.bytes.Bytes
 import kotlin.coroutines.CoroutineContext
 
 /**
@@ -27,11 +26,11 @@
  * @return A key-value store.
  * @constructor Open an in-memory key-value store.
  */
-class MapKeyValueStore
+class MapKeyValueStore<K, V>
 constructor(
-  private val map: MutableMap<Bytes, Bytes> = HashMap(),
+  private val map: MutableMap<K, V> = HashMap(),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
@@ -42,7 +41,7 @@
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(): MapKeyValueStore = MapKeyValueStore()
+    fun <K, V> open() = MapKeyValueStore<K, V>()
 
     /**
      * Open an in-memory key-value store.
@@ -51,16 +50,16 @@
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(map: MutableMap<Bytes, Bytes>) = MapKeyValueStore(map)
+    fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map)
   }
 
-  override suspend fun get(key: Bytes): Bytes? = map[key]
+  override suspend fun get(key: K): V? = map[key]
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     map[key] = value
   }
 
-  override suspend fun keys(): Iterable<Bytes> = map.keys
+  override suspend fun keys(): Iterable<K> = map.keys
 
   /**
    * Has no effect in this KeyValueStore implementation.
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
index 8e8717c..f22f0cd 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt
@@ -24,58 +24,131 @@
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.future.await
 import org.apache.tuweni.bytes.Bytes
+import org.checkerframework.checker.units.qual.K
 import java.net.InetAddress
 import java.util.concurrent.CompletionStage
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
  * A key-value store backed by Redis.
  *
  * @param uri The uri to the Redis store.
+ * @param keySerializer the serializer of key objects to bytes
+ * @param valueSerializer the serializer of value objects to bytes
+ * @param keyDeserializer the deserializer of keys from bytes
+ * @param valueDeserializer the deserializer of values from bytes
+ * @param coroutineContext the co-routine context in which this store executes
  * @constructor Open a Redis-backed key-value store.
  */
-class RedisKeyValueStore(
+class RedisKeyValueStore<K, V>(
   uri: String,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a Redis-backed key-value store.
      *
      * @param uri The uri to the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(uri: String) = RedisKeyValueStore(uri)
+    fun <K, V> open(
+      uri: String,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) = RedisKeyValueStore(uri,
+      keySerializer::apply,
+      valueSerializer::apply,
+      keyDeserializer::apply,
+      valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param port The port for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(port: Int) = RedisKeyValueStore(port)
+    fun <K, V> open(
+      port: Int,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(port = port,
+        keySerializer = keySerializer::apply,
+        valueSerializer = valueSerializer::apply,
+        keyDeserializer = keyDeserializer::apply,
+        valueDeserializer = valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param address The address for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(address: InetAddress) = RedisKeyValueStore(6379, address)
+    fun <K, V> open(
+      address: InetAddress,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(6379,
+        address,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * Open a Redis-backed key-value store.
      *
      * @param port The port for the Redis store.
      * @param address The address for the Redis store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      */
     @JvmStatic
-    fun open(port: Int, address: InetAddress) = RedisKeyValueStore(port, address)
+    fun <K, V> open(
+      port: Int,
+      address: InetAddress,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) =
+      RedisKeyValueStore(port,
+        address,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply)
 
     /**
      * A [RedisCodec] for working with Bytes classes.
@@ -94,12 +167,24 @@
    *
    * @param port The port for the Redis store.
    * @param address The address for the Redis store.
+   * @param keySerializer the serializer of key objects to bytes
+   * @param valueSerializer the serializer of value objects to bytes
+   * @param keyDeserializer the deserializer of keys from bytes
+   * @param valueDeserializer the deserializer of values from bytes
    */
   @JvmOverloads
   constructor(
     port: Int = 6379,
-    address: InetAddress = InetAddress.getLoopbackAddress()
-  ) : this(RedisURI.create(address.hostAddress, port).toURI().toString())
+    address: InetAddress = InetAddress.getLoopbackAddress(),
+    keySerializer: (K) -> Bytes,
+    valueSerializer: (V) -> Bytes,
+    keyDeserializer: (Bytes) -> K,
+    valueDeserializer: (Bytes) -> V
+  ) : this(RedisURI.create(address.hostAddress, port).toURI().toString(),
+    keySerializer,
+    valueSerializer,
+    keyDeserializer,
+    valueDeserializer)
 
   init {
     val redisClient = RedisClient.create(uri)
@@ -107,14 +192,20 @@
     asyncCommands = conn.async()
   }
 
-  override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()
+  override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply {
+    if (it == null) {
+      null
+    } else {
+      valueDeserializer(it)
+    }
+  }.await()
 
-  override suspend fun put(key: Bytes, value: Bytes) {
-    val future: CompletionStage<String> = asyncCommands.set(key, value)
+  override suspend fun put(key: K, value: V) {
+    val future: CompletionStage<String> = asyncCommands.set(keySerializer(key), valueSerializer(value))
     future.await()
   }
 
-  override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await()
+  override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer)
 
   override fun close() {
     conn.close()
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
index 7911803..30e8dad 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt
@@ -25,6 +25,7 @@
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Function
 import kotlin.coroutines.CoroutineContext
 
 /**
@@ -37,37 +38,72 @@
  * @throws IOException If an I/O error occurs.
  * @constructor Open a RocksDB-backed key-value store.
  */
-class RocksDBKeyValueStore
+class RocksDBKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   dbPath: Path,
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes) -> V,
   options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a RocksDB-backed key-value store.
      *
      * @param dbPath The path to the RocksDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path) = RocksDBKeyValueStore(dbPath)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>
+    ) = RocksDBKeyValueStore(dbPath,
+      keySerializer::apply,
+      valueSerializer::apply,
+      keyDeserializer::apply,
+      valueDeserializer::apply)
 
     /**
      * Open a RocksDB-backed key-value store.
      *
      * @param dbPath The path to the RocksDB database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @param options Options for the RocksDB database.
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(dbPath: Path, options: Options) = RocksDBKeyValueStore(dbPath, options)
+    fun <K, V> open(
+      dbPath: Path,
+      keySerializer: Function<K, Bytes>,
+      valueSerializer: Function<V, Bytes>,
+      keyDeserializer: Function<Bytes, K>,
+      valueDeserializer: Function<Bytes, V>,
+      options: Options
+    ) =
+      RocksDBKeyValueStore(dbPath,
+        keySerializer::apply,
+        valueSerializer::apply,
+        keyDeserializer::apply,
+        valueDeserializer::apply,
+        options)
   }
 
   private val db: RocksDB
@@ -79,43 +115,43 @@
     db = RocksDB.open(options, dbPath.toAbsolutePath().toString())
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
+  override suspend fun get(key: K): V? {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
-    val rawValue = db[key.toArrayUnsafe()]
+    val rawValue = db[keySerializer(key).toArrayUnsafe()]
     return if (rawValue == null) {
       null
     } else {
-      Bytes.wrap(rawValue)
+      valueDeserializer(Bytes.wrap(rawValue))
     }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
-    db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
+    db.put(keySerializer(key).toArrayUnsafe(), valueSerializer(value).toArrayUnsafe())
   }
 
-  private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> {
+  private class BytesIterator<K>(val rIterator: RocksIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
 
     override fun hasNext(): Boolean = rIterator.isValid
 
-    override fun next(): Bytes {
-      val key = Bytes.wrap(rIterator.key())
+    override fun next(): K {
+      val key = rIterator.key()
       rIterator.next()
-      return key
+      return keyDeserializer(Bytes.wrap(key))
     }
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     if (closed.get()) {
       throw IllegalStateException("Closed DB")
     }
     val iter = db.newIterator()
     iter.seekToFirst()
-    return Iterable { BytesIterator(iter) }
+    return Iterable { BytesIterator(iter, keyDeserializer) }
   }
 
   /**
diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
index aa07336..46c6e93 100644
--- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
+++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt
@@ -38,27 +38,45 @@
  * @throws IOException If an I/O error occurs.
  * @constructor Open a relational database backed key-value store.
  */
-class SQLKeyValueStore
+class SQLKeyValueStore<K, V>
 @Throws(IOException::class)
 constructor(
   jdbcurl: String,
   val tableName: String = "store",
   val keyColumn: String = "key",
   val valueColumn: String = "value",
+  private val keySerializer: (K) -> Bytes,
+  private val valueSerializer: (V) -> Bytes,
+  private val keyDeserializer: (Bytes) -> K,
+  private val valueDeserializer: (Bytes?) -> V?,
   override val coroutineContext: CoroutineContext = Dispatchers.IO
-) : KeyValueStore {
+) : KeyValueStore<K, V> {
 
   companion object {
     /**
      * Open a relational database backed key-value store.
      *
      * @param jdbcUrl The JDBC url to connect to the database.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl)
+    fun <K, V> open(
+      jdbcUrl: String,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) = SQLKeyValueStore<K, V>(jdbcurl = jdbcUrl,
+      keySerializer = keySerializer,
+      valueSerializer = valueSerializer,
+    keyDeserializer = keyDeserializer,
+      valueDeserializer = valueDeserializer)
 
     /**
      * Open a relational database backed key-value store.
@@ -67,13 +85,33 @@
      * @param tableName the name of the table to use for storage.
      * @param keyColumn the key column of the store.
      * @param valueColumn the value column of the store.
+     * @param keySerializer the serializer of key objects to bytes
+     * @param valueSerializer the serializer of value objects to bytes
+     * @param keyDeserializer the deserializer of keys from bytes
+     * @param valueDeserializer the deserializer of values from bytes
      * @return A key-value store.
      * @throws IOException If an I/O error occurs.
      */
     @JvmStatic
     @Throws(IOException::class)
-    fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) =
-      SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn)
+    fun <K, V> open(
+      jdbcUrl: String,
+      tableName: String,
+      keyColumn: String,
+      valueColumn: String,
+      keySerializer: (K) -> Bytes,
+      valueSerializer: (V) -> Bytes,
+      keyDeserializer: (Bytes) -> K,
+      valueDeserializer: (Bytes?) -> V?
+    ) =
+      SQLKeyValueStore<K, V>(jdbcUrl,
+        tableName,
+        keyColumn,
+        valueColumn,
+        keySerializer,
+        valueSerializer,
+        keyDeserializer,
+        valueDeserializer)
   }
 
   private val connectionPool: BoneCP
@@ -85,34 +123,34 @@
     connectionPool = BoneCP(config)
   }
 
-  override suspend fun get(key: Bytes): Bytes? {
+  override suspend fun get(key: K): V? {
       connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
-        stmt.setBytes(1, key.toArrayUnsafe())
+        stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
         stmt.execute()
 
         val rs = stmt.resultSet
 
         return if (rs.next()) {
-          Bytes.wrap(rs.getBytes(1))
+          valueDeserializer(Bytes.wrap(rs.getBytes(1)))
         } else {
           null
         }
       }
   }
 
-  override suspend fun put(key: Bytes, value: Bytes) {
+  override suspend fun put(key: K, value: V) {
     connectionPool.asyncConnection.await().use {
         val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
-        stmt.setBytes(1, key.toArrayUnsafe())
-        stmt.setBytes(2, value.toArrayUnsafe())
+        stmt.setBytes(1, keySerializer(key).toArrayUnsafe())
+        stmt.setBytes(2, valueSerializer(value).toArrayUnsafe())
         it.autoCommit = false
         try {
           stmt.execute()
         } catch (e: SQLException) {
           val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?")
-          updateStmt.setBytes(1, value.toArrayUnsafe())
-          updateStmt.setBytes(2, key.toArrayUnsafe())
+          updateStmt.setBytes(1, valueSerializer(value).toArrayUnsafe())
+          updateStmt.setBytes(2, keySerializer(key).toArrayUnsafe())
           updateStmt.execute()
         }
         it.commit()
@@ -120,24 +158,24 @@
       }
   }
 
-  private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> {
+  private class SQLIterator<K>(val resultSet: ResultSet, val keyDeserializer: (Bytes) -> K) : Iterator<K> {
 
     private var next = resultSet.next()
 
     override fun hasNext(): Boolean = next
 
-    override fun next(): Bytes {
-      val key = Bytes.wrap(resultSet.getBytes(1))
+    override fun next(): K {
+      val key = keyDeserializer(Bytes.wrap(resultSet.getBytes(1)))
       next = resultSet.next()
       return key
     }
   }
 
-  override suspend fun keys(): Iterable<Bytes> {
+  override suspend fun keys(): Iterable<K> {
     connectionPool.asyncConnection.await().use {
       val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName")
       stmt.execute()
-      return Iterable { SQLIterator(stmt.resultSet) }
+      return Iterable { SQLIterator(stmt.resultSet, keyDeserializer) }
     }
   }
 
diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
index b5d8c2a..19e0f37 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java
@@ -27,6 +27,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +35,12 @@
 @ExtendWith(TempDirectoryExtension.class)
 class KeyValueStoreTest {
 
+  private static final Function<Bytes, Bytes> bytesIdentityFn = Function.identity();
+
   @Test
   void testPutAndGet() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,14 +52,14 @@
   @Test
   void testNoValue() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     assertNull(store.getAsync(Bytes.of(123)).get());
   }
 
   @Test
   void testKeys() throws Exception {
     Map<Bytes, Bytes> map = new HashMap<>();
-    KeyValueStore store = MapKeyValueStore.open(map);
+    KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map);
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Set<Bytes> keys = new HashSet<>();
@@ -66,7 +69,12 @@
 
   @Test
   void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
-    try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+    try (LevelDBKeyValueStore<Bytes, Bytes> leveldb = LevelDBKeyValueStore.open(
+        tempDirectory.resolve("foo").resolve("bar"),
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn)) {
       AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = leveldb.getAsync(Bytes.of(123)).get();
@@ -77,7 +85,12 @@
 
   @Test
   void testRocksDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception {
-    try (RocksDBKeyValueStore rocksdb = RocksDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) {
+    try (RocksDBKeyValueStore<Bytes, Bytes> rocksdb = RocksDBKeyValueStore.open(
+        tempDirectory.resolve("foo").resolve("bar"),
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn,
+        bytesIdentityFn)) {
       AsyncCompletion completion = rocksdb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = rocksdb.getAsync(Bytes.of(123)).get();
diff --git a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
index c218a65..db44ae1 100644
--- a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
+++ b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java
@@ -22,6 +22,7 @@
 import org.apache.tuweni.junit.RedisServerExtension;
 
 import java.net.InetAddress;
+import java.util.function.Function;
 
 import io.lettuce.core.RedisClient;
 import io.lettuce.core.RedisURI;
@@ -34,7 +35,8 @@
 
   @Test
   void testPutAndGet(@RedisPort Integer redisPort) throws Exception {
-    KeyValueStore store = RedisKeyValueStore.open(redisPort);
+    KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore
+        .open(redisPort, Function.identity(), Function.identity(), Function.identity(), Function.identity());
     AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13));
     completion.join();
     Bytes value = store.getAsync(Bytes.of(123)).get();
@@ -49,13 +51,24 @@
 
   @Test
   void testNoValue(@RedisPort Integer redisPort) throws Exception {
-    KeyValueStore store = RedisKeyValueStore.open(redisPort, InetAddress.getLoopbackAddress());
+    KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore.open(
+        redisPort,
+        InetAddress.getLoopbackAddress(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity());
     assertNull(store.getAsync(Bytes.of(124)).get());
   }
 
   @Test
   void testRedisCloseable(@RedisPort Integer redisPort) throws Exception {
-    try (RedisKeyValueStore redis = RedisKeyValueStore.open("redis://127.0.0.1:" + redisPort)) {
+    try (RedisKeyValueStore<Bytes, Bytes> redis = RedisKeyValueStore.open(
+        "redis://127.0.0.1:" + redisPort,
+        Function.identity(),
+        Function.identity(),
+        Function.identity(),
+        Function.identity())) {
       AsyncCompletion completion = redis.putAsync(Bytes.of(125), Bytes.of(10, 12, 13));
       completion.join();
       Bytes value = redis.getAsync(Bytes.of(125)).get();
diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
index ae8c62b..94f4ffb 100644
--- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
+++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt
@@ -110,7 +110,7 @@
       runBlocking {
         kv.put(bar, foo)
         kv.put(foo, bar)
-          val keys = (kv.keys().map { it })
+        val keys = (kv.keys().map { it })
         keys.should.contain(bar)
         keys.should.contain(foo)
       }
@@ -120,7 +120,12 @@
 
 object MapDBKeyValueStoreSpec : Spek({
   val testDir = Files.createTempDirectory("data")
-  val kv = MapDBKeyValueStore(testDir.resolve("data.db"))
+  val kv = MapDBKeyValueStore(
+    testDir.resolve("data.db"),
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
 
   describe("a MapDB-backed key value store") {
 
@@ -141,12 +146,19 @@
       runBlocking {
         kv.put(foobar, foo)
         kv.put(foo, bar)
-        kv.keys().should.equal(setOf(foobar, foo))
+        val keys = kv.keys().map { it }
+        keys.should.contain(foo)
+        keys.should.contain(foobar)
       }
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db"))
+      val kv2 = MapDBKeyValueStore(
+        testDir.resolve("data2.db"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -168,7 +180,12 @@
 
 object LevelDBKeyValueStoreSpec : Spek({
   val path = Files.createTempDirectory("leveldb")
-  val kv = LevelDBKeyValueStore(path)
+  val kv = LevelDBKeyValueStore(
+    path,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -197,7 +214,12 @@
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = LevelDBKeyValueStore(path.resolve("subdb"))
+      val kv2 = LevelDBKeyValueStore(
+        path.resolve("subdb"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -214,7 +236,12 @@
 
 object RocksDBKeyValueStoreSpec : Spek({
   val path = Files.createTempDirectory("rocksdb")
-  val kv = RocksDBKeyValueStore(path)
+  val kv = RocksDBKeyValueStore(
+    path,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE)
@@ -245,7 +272,12 @@
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = RocksDBKeyValueStore(path.resolve("subdb"))
+      val kv2 = RocksDBKeyValueStore(
+        path.resolve("subdb"),
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false
@@ -269,8 +301,21 @@
     st.executeUpdate("create table store(key binary, value binary, primary key(key))")
     st.executeUpdate("create table store2(id binary, val binary, primary key(id))")
   }
-  val kv = SQLKeyValueStore(jdbcUrl)
-  val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val")
+  val kv = SQLKeyValueStore(
+    jdbcUrl,
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
+  val otherkv = SQLKeyValueStore.open(
+    jdbcUrl,
+    "store2",
+    "id",
+    "val",
+    keySerializer = { it },
+    valueSerializer = { it },
+    keyDeserializer = { it },
+    valueDeserializer = { it })
   afterGroup {
     kv.close()
     otherkv.close()
@@ -316,7 +361,12 @@
     }
 
     it("should not allow usage after the DB is closed") {
-      val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
+      val kv2 = SQLKeyValueStore(
+        "jdbc:h2:mem:testdb",
+        keySerializer = { it },
+        valueSerializer = { it },
+        keyDeserializer = { it },
+        valueDeserializer = { it })
       kv2.close()
       runBlocking {
         var caught = false