Merge pull request #39 from YouJustDontKnow/v5-udp-transport

Discv5 kademlia
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
new file mode 100644
index 0000000..3388444
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.bytes.Bytes
+
+/**
+ * In-memory storage of node records
+ */
+interface ENRStorage {
+
+  /**
+   * Persist node record into storage
+   *
+   * @param enr node record
+   */
+  fun set(enr: Bytes)
+
+  /**
+   * Find node record into storage
+   *
+   * @param nodeId node identifier
+   *
+   * @return node record
+   */
+  fun find(nodeId: Bytes): Bytes?
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt
new file mode 100644
index 0000000..843378b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+/**
+ * Udp message listener for message observance, generally for test purposes
+ */
+interface MessageObserver {
+
+  /**
+   * Perform message observation
+   *
+   * @param incoming processed message
+   */
+  fun observe(message: UdpMessage)
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
index 9a0fcc0..b146209 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
@@ -22,10 +22,12 @@
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
 import org.apache.tuweni.io.Base64URLSafe
 import java.net.InetSocketAddress
 import java.time.Instant
@@ -46,7 +48,6 @@
    * Executes service shut down
    */
   fun terminate(await: Boolean = false)
-
 }
 
 internal class DefaultNodeDiscoveryService(
@@ -63,7 +64,8 @@
     null,
     bindAddress.port
   ),
-  private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR),
+  private val enrStorage: ENRStorage = DefaultENRStorage(),
+  private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR, enrStorage),
   override val coroutineContext: CoroutineContext = Dispatchers.Default
 ) : NodeDiscoveryService, CoroutineScope {
 
@@ -91,8 +93,10 @@
         val randomMessage = RandomMessage()
         val address = InetSocketAddress(enr.ip(), enr.udp())
 
-        connector.addPendingNodeId(address, rlpENR)
-        connector.send(address, randomMessage, rlpENR)
+        val destNodeId = Hash.sha2_256(rlpENR)
+        enrStorage.set(rlpENR)
+        connector.getNodesTable().add(rlpENR)
+        connector.send(address, randomMessage, destNodeId)
       }
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
index eec5073..23a2cf3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
@@ -19,6 +19,7 @@
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 
 /**
@@ -43,7 +44,7 @@
    *
    * @return encoded message
    */
-  fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): Bytes
+  fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): EncodeResult
 
   /**
    * Decodes message, decrypting it's body
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 9770885..8fef7cf 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -18,8 +18,11 @@
 
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
 import java.net.InetSocketAddress
 
 /**
@@ -68,23 +71,6 @@
   fun started(): Boolean
 
   /**
-   * Add node identifier which awaits for authentication
-   *
-   * @param address socket address
-   * @param nodeId node identifier
-   */
-  fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes)
-
-  /**
-   * Get node identifier which awaits for authentication
-   *
-   * @param address socket address
-   *
-   * @return node identifier
-   */
-  fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
-
-  /**
    * Provides node's key pair
    *
    * @return node's key pair
@@ -92,9 +78,62 @@
   fun getNodeKeyPair(): SECP256K1.KeyPair
 
   /**
+   * Provides node's ENR in RLP encoded representation
+   *
+   * @return node's RLP encoded ENR
+   */
+  fun getEnrBytes(): Bytes
+
+  /**
    * Provides node's ENR
    *
    * @return node's ENR
    */
-  fun getEnr(): Bytes
+  fun getEnr(): EthereumNodeRecord
+
+  /**
+   * Attach observer for listening processed messages
+   *
+   * @param observer instance, proceeding observation
+   */
+  fun attachObserver(observer: MessageObserver)
+
+  /**
+   * Remove observer for listening processed message
+   *
+   * @param observer observer for removal
+   */
+  fun detachObserver(observer: MessageObserver)
+
+  /**
+   * Get kademlia routing table
+   *
+   * @return kademlia table
+   */
+  fun getNodesTable(): RoutingTable
+
+  /**
+   * Retrieve enr of pinging node
+   *
+   * @param node identifier
+   *
+   * @return node record
+   */
+  fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
+
+  /**
+   * Retrieve last sent message, in case if it unauthorized and node can resend with authentication header
+   *
+   * @param authTag message's authentication tag
+   *
+   * @return message, including node identifier
+   */
+  fun getPendingMessage(authTag: Bytes): TrackingMessage
+
+  /**
+   * Provides enr storage of known nodes
+   *
+   * @return nodes storage
+   */
+  fun getNodeRecords(): ENRStorage
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
index f1a22ae..7033ffb 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
@@ -31,7 +31,6 @@
   private const val CIPHER_NAME: String = "AES/GCM/NoPadding"
   private const val KEY_SIZE: Int = 128
 
-
   /**
    * AES128GCM encryption function
    *
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
index cb21279..0858bdd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
@@ -27,7 +27,7 @@
  */
 object SessionKeyGenerator {
 
-  private const val DERIVED_KEY_SIZE: Int = 16
+  const val DERIVED_KEY_SIZE: Int = 16
 
   private val INFO_PREFIX = Bytes.wrap("discovery v5 key agreement".toByteArray())
 
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
index f03b63b..79783c3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
@@ -21,9 +21,9 @@
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.ENR_REQUEST_RETRY_DELAY_MS
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
 import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
 import org.apache.tuweni.devp2p.v5.misc.AuthHeader
@@ -34,15 +34,16 @@
 
 class DefaultAuthenticationProvider(
   private val keyPair: SECP256K1.KeyPair,
-  private val enr: Bytes
+  private val routingTable: RoutingTable
 ) : AuthenticationProvider {
 
   private val sessionKeys: Cache<String, SessionKey> = CacheBuilder
     .newBuilder()
-    .expireAfterWrite(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
+    .expireAfterWrite(SESSION_KEY_EXPIRATION, TimeUnit.MINUTES)
     .build()
-  private val nodeId: Bytes = Hash.sha2_256(enr)
+  private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr())
 
+  @Synchronized
   override fun authenticate(handshakeParams: HandshakeInitParameters): AuthHeader {
     // Generate ephemeral key pair
     val ephemeralKeyPair = SECP256K1.KeyPair.random()
@@ -57,21 +58,30 @@
     // Derive keys
     val sessionKey = SessionKeyGenerator.generate(nodeId, destNodeId, secret, handshakeParams.idNonce)
 
-    setSessionKey(destNodeId.toHexString(), sessionKey)
+    sessionKeys.put(destNodeId.toHexString(), sessionKey)
 
     val signature = sign(keyPair, handshakeParams)
 
-    return generateAuthHeader(enr, signature, handshakeParams, sessionKey.authRespKey, ephemeralKeyPair.publicKey())
+    return generateAuthHeader(
+      routingTable.getSelfEnr(),
+      signature,
+      handshakeParams,
+      sessionKey.authRespKey,
+      ephemeralKeyPair.publicKey()
+    )
   }
 
+  @Synchronized
   override fun findSessionKey(nodeId: String): SessionKey? {
     return sessionKeys.getIfPresent(nodeId)
   }
 
+  @Synchronized
   override fun setSessionKey(nodeId: String, sessionKey: SessionKey) {
     sessionKeys.put(nodeId, sessionKey)
   }
 
+  @Synchronized
   override fun finalizeHandshake(senderNodeId: Bytes, authHeader: AuthHeader) {
     val ephemeralPublicKey = SECP256K1.PublicKey.fromBytes(authHeader.ephemeralPublicKey)
     val secret = SECP256K1.calculateKeyAgreement(keyPair.secretKey(), ephemeralPublicKey)
@@ -89,7 +99,8 @@
       if (!signatureVerified) {
         throw IllegalArgumentException("Signature is not verified")
       }
-      setSessionKey(senderNodeId.toHexString(), sessionKey)
+      sessionKeys.put(senderNodeId.toHexString(), sessionKey)
+      routingTable.add(enrRLP)
     }
   }
 
@@ -125,6 +136,8 @@
   }
 
   companion object {
+    private const val SESSION_KEY_EXPIRATION: Long = 5
+
     private const val ZERO_NONCE_SIZE: Int = 12
     private const val VERSION: Int = 5
 
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 6dbab0f..51374c1 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -21,6 +21,7 @@
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.PacketCodec
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
@@ -28,49 +29,63 @@
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.AuthHeader
 import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.ReqTicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
 import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.rlp.RLPReader
 import kotlin.IllegalArgumentException
 
 class DefaultPacketCodec(
   private val keyPair: SECP256K1.KeyPair,
-  private val enr: Bytes,
-  private val nodeId: Bytes = Hash.sha2_256(enr),
-  private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, enr)
+  private val routingTable: RoutingTable,
+  private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr()),
+  private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
 ) : PacketCodec {
 
-  override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): Bytes {
+  override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): EncodeResult {
     if (message is WhoAreYouMessage) {
       val magic = UdpMessage.magic(nodeId)
       val content = message.encode()
-      return Bytes.wrap(magic, content)
+      return EncodeResult(magic, Bytes.wrap(magic, content))
     }
 
     val tag = UdpMessage.tag(nodeId, destNodeId)
     if (message is RandomMessage) {
-      val authTag = UdpMessage.authTag()
-      val rlpAuthTag = RLP.encodeValue(authTag)
-      val content = message.encode()
-      return Bytes.wrap(tag, rlpAuthTag, content)
+      return encodeRandomMessage(tag, message)
     }
 
-    val authHeader = handshakeParams?.let { authenticationProvider.authenticate(handshakeParams) }
+    val sessionKey = authenticationProvider.findSessionKey(destNodeId.toHexString())
+    val authHeader = handshakeParams?.let {
+      if (null == sessionKey) {
+        authenticationProvider.authenticate(handshakeParams)
+      } else null
+    }
 
     val initiatorKey = authenticationProvider.findSessionKey(destNodeId.toHexString())?.initiatorKey
-      ?: throw IllegalArgumentException() // TODO handle
+      ?: return encodeRandomMessage(tag, RandomMessage())
     val messagePlain = Bytes.wrap(message.getMessageType(), message.encode())
     return if (null != authHeader) {
       val encodedHeader = authHeader.asRlp()
       val authTag = authHeader.authTag
       val encryptionMeta = Bytes.wrap(tag, encodedHeader)
       val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, encryptionMeta)
-      Bytes.wrap(tag, encodedHeader, encryptionResult)
+      if (message is NodesMessage) {
+        println(encryptionResult)
+      }
+      EncodeResult(authTag, Bytes.wrap(tag, encodedHeader, encryptionResult))
     } else {
       val authTag = UdpMessage.authTag()
       val authTagHeader = RLP.encodeValue(authTag)
       val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, tag)
-      Bytes.wrap(tag, authTagHeader, encryptionResult)
+      EncodeResult(authTag, Bytes.wrap(tag, authTagHeader, encryptionResult))
     }
   }
 
@@ -85,6 +100,7 @@
   private fun read(tag: Bytes, senderNodeId: Bytes, contentWithHeader: Bytes, reader: RLPReader): UdpMessage {
     // Distinguish auth header or auth tag
     var authHeader: AuthHeader? = null
+    var authTag: Bytes = Bytes.EMPTY
     if (reader.nextIsList()) {
       if (WHO_ARE_YOU_MESSAGE_LENGTH == contentWithHeader.size()) {
         return WhoAreYouMessage.create(contentWithHeader)
@@ -99,14 +115,14 @@
       }
       authenticationProvider.finalizeHandshake(senderNodeId, authHeader)
     } else {
-      reader.readValue()
+      authTag = reader.readValue()
     }
 
     val encryptedContent = contentWithHeader.slice(reader.position())
 
     // Decrypt
     val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
-      ?: return RandomMessage(encryptedContent)
+      ?: return RandomMessage.create(authTag, encryptedContent)
     val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
     val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
     val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
@@ -114,11 +130,25 @@
 
     // Retrieve result
     return when (messageType.toInt()) {
+      1 -> PingMessage.create(message)
+      2 -> PongMessage.create(message)
       3 -> FindNodeMessage.create(message)
+      4 -> NodesMessage.create(message)
+      5 -> ReqTicketMessage.create(message)
+      6 -> TicketMessage.create(message)
+      7 -> RegTopicMessage.create(message)
+      8 -> RegConfirmationMessage.create(message)
+      9 -> TopicQueryMessage.create(message)
       else -> throw IllegalArgumentException("Unknown message retrieved")
     }
   }
 
+  private fun encodeRandomMessage(tag: Bytes, message: RandomMessage): EncodeResult {
+    val rlpAuthTag = RLP.encodeValue(message.authTag)
+    val content = message.encode()
+    return EncodeResult(message.authTag, Bytes.wrap(tag, rlpAuthTag, content))
+  }
+
   companion object {
     private const val WHO_ARE_YOU_MESSAGE_LENGTH = 48
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index ebb4eb8..9a517a0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -16,16 +16,28 @@
  */
 package org.apache.tuweni.devp2p.v5.internal
 
+import com.google.common.cache.Cache
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.RemovalCause
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
 import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.MessageObserver
 import org.apache.tuweni.devp2p.v5.PacketCodec
 import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.PongMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -33,9 +45,15 @@
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
 import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
 import java.net.InetSocketAddress
 import java.nio.ByteBuffer
+import java.time.Duration
 import java.util.logging.Logger
 import kotlin.coroutines.CoroutineContext
 
@@ -43,38 +61,64 @@
   private val bindAddress: InetSocketAddress,
   private val keyPair: SECP256K1.KeyPair,
   private val selfEnr: Bytes,
-  private val nodeId: Bytes = Hash.sha2_256(selfEnr),
+  private val enrStorage: ENRStorage = DefaultENRStorage(),
   private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
-  private val sendChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
-  private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, selfEnr),
+  private val nodesTable: RoutingTable = RoutingTable(selfEnr),
+  private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable),
+  private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
+  private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : UdpConnector, CoroutineScope {
 
-  private val log: Logger = Logger.getLogger(this.javaClass.simpleName)
+  private val log: Logger = Logger.getLogger(DefaultUdpConnector::class.java.simpleName)
 
   private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
-  private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+  private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler()
+  private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
+  private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
+  private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
+  private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
 
-  private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf()
+  private val askedNodes: MutableList<Bytes> = mutableListOf()
 
+  private val pendingMessages: Cache<String, TrackingMessage> = CacheBuilder.newBuilder()
+    .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
+    .build()
+  private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
+    .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT + PING_TIMEOUT))
+    .removalListener<String, Bytes> {
+      if (RemovalCause.EXPIRED == it.cause) {
+        getNodesTable().evict(it.value)
+      }
+    }.build()
+
+  private lateinit var refreshJob: Job
   private lateinit var receiveJob: Job
+  private lateinit var lookupJob: Job
+
+  override fun available(): Boolean = receiveChannel.isOpen
+
+  override fun started(): Boolean = ::receiveJob.isInitialized && available()
+
+  override fun getEnrBytes(): Bytes = selfEnr
+
+  override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+
+  override fun getNodeRecords(): ENRStorage = enrStorage
+
+  override fun getNodesTable(): RoutingTable = nodesTable
+
+  override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+
+  override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
+    ?: throw IllegalArgumentException("Pending message not found")
 
   override fun start() {
     receiveChannel.bind(bindAddress)
 
-    receiveJob = launch {
-      val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
-      while (receiveChannel.isOpen) {
-        datagram.clear()
-        val address = receiveChannel.receive(datagram) as InetSocketAddress
-        datagram.flip()
-        try {
-          processDatagram(datagram, address)
-        } catch (ex: Exception) {
-          log.warning(ex.message)
-        }
-      }
-    }
+    receiveJob = receiveDatagram()
+    lookupJob = lookupNodes()
+    refreshJob = refreshNodesTable()
   }
 
   override fun send(
@@ -84,45 +128,122 @@
     handshakeParams: HandshakeInitParameters?
   ) {
     launch {
-      val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
-      sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
+      val encodeResult = packetCodec.encode(message, destNodeId, handshakeParams)
+      pendingMessages.put(encodeResult.authTag.toHexString(), TrackingMessage(message, destNodeId))
+      receiveChannel.send(ByteBuffer.wrap(encodeResult.content.toArray()), address)
     }
   }
 
   override fun terminate() {
-    receiveJob.cancel()
     receiveChannel.close()
-    sendChannel.close()
+
+    refreshJob.cancel()
+    lookupJob.cancel()
+    receiveJob.cancel()
   }
 
-  override fun available(): Boolean = receiveChannel.isOpen
-
-  override fun started(): Boolean = ::receiveJob.isInitialized && available()
-
-  override fun getEnr(): Bytes = selfEnr
-
-  override fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes) {
-    authenticatingPeers[address] = nodeId
+  override fun attachObserver(observer: MessageObserver) {
+    messageListeners.add(observer)
   }
 
-  override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+  override fun detachObserver(observer: MessageObserver) {
+    messageListeners.remove(observer)
+  }
 
-  override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
-    val result = authenticatingPeers[address]
-      ?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
-    authenticatingPeers.remove(address)
+  override fun getAwaitingPongRecord(nodeId: Bytes): Bytes? {
+    val nodeIdHex = nodeId.toHexString()
+    val result = pings.getIfPresent(nodeIdHex)
+    pings.invalidate(nodeIdHex)
     return result
   }
 
+  // Lookup nodes
+  private fun lookupNodes() = launch {
+    while (true) {
+      val nearestNodes = getNodesTable().nearest(selfEnr)
+      if (REQUIRED_LOOKUP_NODES > nearestNodes.size) {
+        lookupInternal(nearestNodes)
+      } else {
+        askedNodes.clear()
+      }
+      delay(LOOKUP_REFRESH_RATE)
+    }
+  }
+
+  private fun lookupInternal(nearest: List<Bytes>) {
+    val nonAskedNodes = nearest - askedNodes
+    val targetNode = if (nonAskedNodes.isNotEmpty()) nonAskedNodes.random() else Bytes.random(32)
+    val distance = getNodesTable().distanceToSelf(targetNode)
+    for (target in nearest.take(LOOKUP_MAX_REQUESTED_NODES)) {
+      val enr = EthereumNodeRecord.fromRLP(target)
+      val message = FindNodeMessage(distance = distance)
+      val address = InetSocketAddress(enr.ip(), enr.udp())
+      send(address, message, Hash.sha2_256(target))
+      askedNodes.add(target)
+    }
+  }
+
+  // Process packets
+  private fun receiveDatagram() = launch {
+    while (receiveChannel.isOpen) {
+      val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+      val address = receiveChannel.receive(datagram) as InetSocketAddress
+      datagram.flip()
+      launch {
+        try {
+          processDatagram(datagram, address)
+        } catch (ex: Exception) {
+          log.warning("${ex.message}")
+        }
+      }
+    }
+  }
+
   private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
+    if (datagram.limit() > UdpMessage.MAX_UDP_MESSAGE_SIZE) {
+      return
+    }
     val messageBytes = Bytes.wrapByteBuffer(datagram)
     val decodeResult = packetCodec.decode(messageBytes)
     val message = decodeResult.message
     when (message) {
       is RandomMessage -> randomMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       is WhoAreYouMessage -> whoAreYouMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
-      is FindNodeMessage -> { } //TODO: response with NODES message
+      is FindNodeMessage -> findNodeMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is PingMessage -> pingMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
     }
+    messageListeners.forEach { it.observe(message) }
+  }
+
+  // Ping nodes
+  private fun refreshNodesTable(): Job = launch {
+    while (true) {
+      if (!getNodesTable().isEmpty()) {
+        val enrBytes = getNodesTable().random()
+        val nodeId = Hash.sha2_256(enrBytes)
+        if (null == pings.getIfPresent(nodeId.toHexString())) {
+          val enr = EthereumNodeRecord.fromRLP(enrBytes)
+          val address = InetSocketAddress(enr.ip(), enr.udp())
+          val message = PingMessage(enrSeq = enr.seq)
+
+          send(address, message, nodeId)
+          pings.put(nodeId.toHexString(), enrBytes)
+        }
+      }
+      delay(TABLE_REFRESH_RATE)
+    }
+  }
+
+  companion object {
+    private const val REQUIRED_LOOKUP_NODES: Int = 16
+    private const val LOOKUP_MAX_REQUESTED_NODES: Int = 3
+
+    private const val LOOKUP_REFRESH_RATE: Long = 3000
+    private const val TABLE_REFRESH_RATE: Long = 1000
+    private const val REQUEST_TIMEOUT: Long = 1000
+    private const val PING_TIMEOUT: Long = 500
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
new file mode 100644
index 0000000..4e7c354
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class FindNodeMessageHandler : MessageHandler<FindNodeMessage> {
+
+  override fun handle(message: FindNodeMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    if (0 == message.distance) {
+      val response = NodesMessage(message.requestId, 1, listOf(connector.getEnrBytes()))
+      connector.send(address, response, srcNodeId)
+      return
+    }
+
+    val nodes = connector.getNodesTable().nodesOfDistance(message.distance)
+
+    nodes.chunked(MAX_NODES_IN_RESPONSE).forEach {
+      val response = NodesMessage(message.requestId, nodes.size, it)
+      connector.send(address, response, srcNodeId)
+    }
+  }
+
+  companion object {
+    private const val MAX_NODES_IN_RESPONSE: Int = 4
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
new file mode 100644
index 0000000..d8b33dd
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class NodesMessageHandler : MessageHandler<NodesMessage> {
+
+  override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    message.nodeRecords.forEach {
+      EthereumNodeRecord.fromRLP(it)
+      connector.getNodeRecords().set(it)
+      connector.getNodesTable().add(it)
+    }
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt
new file mode 100644
index 0000000..1abaf82
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import java.net.InetSocketAddress
+
+class PingMessageHandler : MessageHandler<PingMessage> {
+
+  override fun handle(message: PingMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    val response = PongMessage(message.requestId, connector.getEnr().seq, address.address, address.port)
+    connector.send(address, response, srcNodeId)
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt
new file mode 100644
index 0000000..fc8c973
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import java.net.InetSocketAddress
+
+class PongMessageHandler : MessageHandler<PongMessage> {
+
+  override fun handle(message: PongMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    val enrBytes = connector.getAwaitingPongRecord(srcNodeId) ?: return
+    val enr = EthereumNodeRecord.fromRLP(enrBytes)
+    if (enr.seq != message.enrSeq) {
+      val request = FindNodeMessage(message.requestId)
+      connector.send(address, request, srcNodeId)
+    }
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
index 477710a..60f965d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
@@ -26,8 +26,7 @@
 class RandomMessageHandler : MessageHandler<RandomMessage> {
 
   override fun handle(message: RandomMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
-    connector.addPendingNodeId(address, srcNodeId)
-    val response = WhoAreYouMessage()
+    val response = WhoAreYouMessage(message.authTag)
     connector.send(address, response, srcNodeId)
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..62025f3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -17,17 +17,16 @@
 package org.apache.tuweni.devp2p.v5.internal.handler
 
 import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.devp2p.v5.MessageHandler
 import org.apache.tuweni.devp2p.v5.UdpConnector
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import java.lang.IllegalArgumentException
 import java.net.InetSocketAddress
 
-class WhoAreYouMessageHandler(
-  private val nodeId: Bytes
-) : MessageHandler<WhoAreYouMessage> {
+class WhoAreYouMessageHandler : MessageHandler<WhoAreYouMessage> {
 
   override fun handle(
     message: WhoAreYouMessage,
@@ -36,11 +35,12 @@
     connector: UdpConnector
   ) {
     // Retrieve enr
-    val destRlp = connector.getPendingNodeIdByAddress(address)
-    val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
-    val destNodeId = Hash.sha2_256(destRlp)
+    val trackingMessage = connector.getPendingMessage(message.authTag)
+    val rlpEnr = connector.getNodeRecords().find(trackingMessage.nodeId)
+      ?: throw IllegalArgumentException("Unable to find node enr by id ${trackingMessage.nodeId}")
+    val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, rlpEnr)
 
-    val findNodeMessage = FindNodeMessage()
-    connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+    val response = if (trackingMessage.message is RandomMessage) FindNodeMessage() else trackingMessage.message
+    connector.send(address, response, trackingMessage.nodeId, handshakeParams)
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
new file mode 100644
index 0000000..5a482ea
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+
+class EncodeResult(
+  val authTag: Bytes,
+  val content: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
new file mode 100644
index 0000000..c0d724b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+class TrackingMessage(
+  val message: UdpMessage,
+  val nodeId: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
index b39ae46..777d389 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
@@ -21,7 +21,7 @@
 
 class FindNodeMessage(
   val requestId: Bytes = UdpMessage.requestId(),
-  val distance: Long = 0
+  val distance: Int = 0
 ) : UdpMessage() {
 
   private val encodedMessageType: Bytes = Bytes.fromHexString("0x03")
@@ -29,7 +29,7 @@
   override fun encode(): Bytes {
     return RLP.encodeList { writer ->
       writer.writeValue(requestId)
-      writer.writeLong(distance)
+      writer.writeInt(distance)
     }
   }
 
@@ -39,7 +39,7 @@
     fun create(content: Bytes): FindNodeMessage {
       return RLP.decodeList(content) { reader ->
         val requestId = reader.readValue()
-        val distance = reader.readLong()
+        val distance = reader.readInt()
         return@decodeList FindNodeMessage(requestId, distance)
       }
     }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
index e43ded7..faaf210 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
@@ -19,6 +19,7 @@
 import org.apache.tuweni.bytes.Bytes
 
 class RandomMessage(
+  val authTag: Bytes = authTag(),
   val data: Bytes = randomData()
 ) : UdpMessage() {
 
@@ -27,8 +28,8 @@
   }
 
   companion object {
-    fun create(content: Bytes): RandomMessage {
-      return RandomMessage(content)
+    fun create(authTag: Bytes, content: Bytes = randomData()): RandomMessage {
+      return RandomMessage(authTag, content)
     }
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
new file mode 100644
index 0000000..26ff813
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import java.util.concurrent.ConcurrentHashMap
+
+class DefaultENRStorage : ENRStorage {
+
+  private val storage: MutableMap<String, Bytes> = ConcurrentHashMap()
+
+  override fun find(nodeId: Bytes): Bytes? = storage[nodeId.toHexString()]
+
+  override fun set(enr: Bytes) {
+    val nodeId = Hash.sha2_256(enr)
+    storage[nodeId.toHexString()] = enr
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
new file mode 100644
index 0000000..6777272
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import com.google.common.math.IntMath
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
+
+class RoutingTable(
+  private val selfEnr: Bytes
+) {
+
+  private val selfNodeId = key(selfEnr)
+  private val nodeIdCalculation: (Bytes) -> ByteArray = { enr -> key(enr) }
+
+  private val table = KademliaRoutingTable(
+    selfId = selfNodeId,
+    k = BUCKET_SIZE,
+    nodeId = nodeIdCalculation,
+    distanceToSelf = {
+      val xorResult = key(it) xorDist selfNodeId
+      IntMath.log2(xorResult, RoundingMode.FLOOR)
+    })
+
+  val size: Int
+    get() = table.size
+
+  fun getSelfEnr(): Bytes = selfEnr
+
+  fun add(enr: Bytes) {
+    if (enr != selfEnr) {
+      table.add(enr)
+    }
+  }
+
+  fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+  fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
+  fun evict(enr: Bytes): Boolean = table.evict(enr)
+
+  fun random(): Bytes = table.getRandom()
+
+  fun isEmpty(): Boolean = table.isEmpty()
+
+  fun nodesOfDistance(distance: Int): List<Bytes> = table.peersOfDistance(distance)
+
+  fun clear() = table.clear()
+
+  private fun key(enr: Bytes): ByteArray = Hash.sha2_256(enr).toArray()
+
+  companion object {
+    private const val BUCKET_SIZE: Int = 16
+  }
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
new file mode 100644
index 0000000..8344c3a
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
+import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+import java.net.InetSocketAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+abstract class AbstractIntegrationTest {
+
+  protected fun createNode(
+    port: Int = 9090,
+    bootList: List<String> = emptyList(),
+    enrStorage: ENRStorage = DefaultENRStorage(),
+    keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random(),
+    enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = port),
+    routingTable: RoutingTable = RoutingTable(enr),
+    address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), port),
+    authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable),
+    packetCodec: PacketCodec = DefaultPacketCodec(
+      keyPair,
+      routingTable,
+      authenticationProvider = authenticationProvider
+    ),
+    connector: UdpConnector = DefaultUdpConnector(
+      address,
+      keyPair,
+      enr,
+      enrStorage,
+      nodesTable = routingTable,
+      packetCodec = packetCodec
+    ),
+    service: NodeDiscoveryService =
+      DefaultNodeDiscoveryService(
+        keyPair,
+        port,
+        enrStorage = enrStorage,
+        bootstrapENRList = bootList,
+        connector = connector
+      )
+  ): TestNode {
+    service.start()
+    return TestNode(
+      bootList,
+      port,
+      enrStorage,
+      keyPair,
+      enr,
+      address,
+      routingTable,
+      authenticationProvider,
+      packetCodec,
+      connector,
+      service
+    )
+  }
+
+  protected fun handshake(initiator: TestNode, recipient: TestNode): Boolean {
+    initiator.enrStorage.set(recipient.enr)
+    initiator.routingTable.add(recipient.enr)
+    val message = RandomMessage()
+    initiator.connector.send(recipient.address, message, recipient.nodeId)
+    while (true) {
+      if (null != recipient.authenticationProvider.findSessionKey(initiator.nodeId.toHexString())) {
+        return true
+      }
+    }
+  }
+
+  protected fun send(initiator: TestNode, recipient: TestNode, message: UdpMessage) {
+    if (message is RandomMessage || message is WhoAreYouMessage) {
+      throw IllegalArgumentException("Can't send handshake initiation message")
+    }
+    initiator.connector.send(recipient.address, message, recipient.nodeId)
+  }
+
+  protected inline fun <reified T : UdpMessage> sendAndAwait(
+    initiator: TestNode,
+    recipient: TestNode,
+    message: UdpMessage
+  ): T {
+    val listener = object : MessageObserver {
+      var result: Channel<T> = Channel()
+
+      override fun observe(message: UdpMessage) {
+        if (message is T) {
+          result.offer(message)
+        }
+      }
+    }
+
+    return runBlocking {
+      initiator.connector.attachObserver(listener)
+      send(initiator, recipient, message)
+      val result = listener.result.receive()
+      initiator.connector.detachObserver(listener)
+      result
+    }
+  }
+}
+
+class TestNode(
+  val bootList: List<String>,
+  val port: Int,
+  val enrStorage: ENRStorage,
+  val keyPair: SECP256K1.KeyPair,
+  val enr: Bytes,
+  val address: InetSocketAddress,
+  val routingTable: RoutingTable,
+  val authenticationProvider: AuthenticationProvider,
+  val packetCodec: PacketCodec,
+  val connector: UdpConnector,
+  val service: NodeDiscoveryService,
+  val nodeId: Bytes = Hash.sha2_256(enr)
+)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
index a9f64e5..2b7d8a7 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
@@ -64,7 +64,7 @@
       bootstrapENRList,
       enrSeq,
       selfENR,
-      connector
+      connector = connector
     )
 
   @Test
@@ -81,7 +81,7 @@
       val receivedBytes = Bytes.wrapByteBuffer(buffer)
       val content = receivedBytes.slice(45)
 
-      val message = RandomMessage.create(content)
+      val message = RandomMessage.create(UdpMessage.authTag(), content)
       assert(message.data.size() == UdpMessage.RANDOM_DATA_LENGTH)
     }
 
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
deleted file mode 100644
index 46b0d47..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5
-
-import kotlinx.coroutines.runBlocking
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
-import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
-import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
-import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
-import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
-import org.apache.tuweni.devp2p.v5.packet.RandomMessage
-import org.apache.tuweni.devp2p.v5.packet.UdpMessage
-import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
-import org.apache.tuweni.io.Base64URLSafe
-import org.apache.tuweni.junit.BouncyCastleExtension
-import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.extension.ExtendWith
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-@ExtendWith(BouncyCastleExtension::class)
-class HandshakeIntegrationTest {
-
-  private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
-  private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = SERVICE_PORT)
-  private val address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), SERVICE_PORT)
-  private val connector: UdpConnector = DefaultUdpConnector(address, keyPair, enr)
-
-  private val clientKeyPair = SECP256K1.KeyPair.random()
-  private val clientEnr = EthereumNodeRecord.toRLP(clientKeyPair, ip = InetAddress.getLocalHost(), udp = CLIENT_PORT)
-  private val authProvider = DefaultAuthenticationProvider(clientKeyPair, clientEnr)
-  private val clientCodec = DefaultPacketCodec(clientKeyPair, clientEnr, authenticationProvider = authProvider)
-  private val socket = CoroutineDatagramChannel.open()
-
-  private val clientNodeId: Bytes = Hash.sha2_256(clientEnr)
-
-  private val bootList = listOf("enr:${Base64URLSafe.encode(clientEnr)}")
-  private val service: NodeDiscoveryService =
-    DefaultNodeDiscoveryService(keyPair, SERVICE_PORT, bootstrapENRList = bootList, connector = connector)
-
-  @BeforeEach
-  fun init() {
-    socket.bind(InetSocketAddress(9091))
-    service.start()
-  }
-
-  @Test
-  fun discv5HandshakeTest() {
-    runBlocking {
-      val buffer = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
-      socket.receive(buffer)
-      buffer.flip()
-
-      var content = Bytes.wrapByteBuffer(buffer)
-      var decodingResult = clientCodec.decode(content)
-      assert(decodingResult.message is RandomMessage)
-      buffer.clear()
-
-      sendWhoAreYou()
-
-      socket.receive(buffer)
-      buffer.flip()
-      content = Bytes.wrapByteBuffer(buffer)
-      decodingResult = clientCodec.decode(content)
-      assert(decodingResult.message is FindNodeMessage)
-      assert(null != authProvider.findSessionKey(Hash.sha2_256(enr).toHexString()))
-
-      val message = decodingResult.message as FindNodeMessage
-
-      assert(message.distance == 0L)
-      assert(message.requestId.size() == UdpMessage.REQUEST_ID_LENGTH)
-    }
-  }
-
-  @AfterEach
-  fun tearDown() {
-    service.terminate(true)
-    socket.close()
-  }
-
-  private suspend fun sendWhoAreYou() {
-    val message = WhoAreYouMessage()
-    val bytes = clientCodec.encode(message, clientNodeId)
-    val buffer = ByteBuffer.wrap(bytes.toArray())
-    socket.send(buffer, address)
-  }
-
-  companion object {
-    private const val SERVICE_PORT: Int = 9090
-    private const val CLIENT_PORT: Int = 9091
-  }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
new file mode 100644
index 0000000..47c82c4
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.junit.jupiter.api.Disabled
+import org.junit.jupiter.api.Test
+
+class IntegrationTest : AbstractIntegrationTest() {
+
+  @Test
+  fun testHandshake() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    val result = handshake(node1, node2)
+
+    assert(result)
+
+    node1.service.terminate(true)
+    node2.service.terminate(true)
+  }
+
+  @Test
+  fun testPing() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    handshake(node1, node2)
+    val pong = sendAndAwait<PongMessage>(node1, node2, PingMessage())
+
+    assert(node1.port == pong.recipientPort)
+
+    node1.service.terminate(true)
+    node2.service.terminate(true)
+  }
+
+  @Test
+  fun testTableMaintenance() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    handshake(node1, node2)
+    runBlocking {
+      assert(!node1.routingTable.isEmpty())
+
+      node2.service.terminate( true)
+
+      delay(5000)
+
+      assert(node1.routingTable.isEmpty())
+
+      node1.service.terminate(true)
+    }
+  }
+
+  @Test
+  @Disabled
+  fun testNetworkLookup() {
+    val targetNode = createNode(9090)
+
+    val node1 = createNode(9091)
+    val node2 = createNode(9092)
+    val node3 = createNode(9093)
+    val node4 = createNode(9094)
+    val node5 = createNode(9095)
+    val node6 = createNode(9096)
+    val node7 = createNode(9097)
+    val node8 = createNode(9098)
+    val node9 = createNode(9099)
+    val node10 = createNode(9100)
+    val node11 = createNode(9101)
+    val node12 = createNode(9102)
+    val node13 = createNode(9103)
+    val node14 = createNode(9104)
+    val node15 = createNode(9105)
+    val node16 = createNode(9106)
+    val node17 = createNode(9107)
+
+    handshake(node1, node2)
+    handshake(node2, node3)
+    handshake(node3, node4)
+    handshake(node4, node5)
+    handshake(node5, node6)
+    handshake(node6, node7)
+    handshake(node7, node8)
+    handshake(node9, node10)
+    handshake(node10, node11)
+    handshake(node11, node12)
+    handshake(node12, node13)
+    handshake(node13, node14)
+    handshake(node14, node15)
+    handshake(node15, node16)
+    handshake(node16, node17)
+
+    handshake(targetNode, node1)
+    handshake(targetNode, node4)
+    handshake(targetNode, node7)
+
+    var size = targetNode.routingTable.size
+    while (size < 8) {
+      val newSize = targetNode.routingTable.size
+      if (size < newSize) {
+        size = newSize
+        println(size)
+      }
+    }
+
+    node1.service.terminate(true)
+    node2.service.terminate(true)
+    node3.service.terminate(true)
+    node4.service.terminate(true)
+    node5.service.terminate(true)
+    node6.service.terminate(true)
+    node7.service.terminate(true)
+    node8.service.terminate(true)
+    node9.service.terminate(true)
+    node10.service.terminate(true)
+    node11.service.terminate(true)
+    node12.service.terminate(true)
+    node13.service.terminate(true)
+    node14.service.terminate(true)
+    node15.service.terminate(true)
+    node16.service.terminate(true)
+    node17.service.terminate(true)
+
+    targetNode.service.terminate(true)
+  }
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
index cebe167..4b6d188 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
@@ -20,8 +20,8 @@
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
-import org.apache.tuweni.devp2p.v5.misc.SessionKey
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
@@ -32,7 +32,8 @@
 
   private val providerKeyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
   private val providerEnr: Bytes = EthereumNodeRecord.toRLP(providerKeyPair, ip = InetAddress.getLocalHost())
-  private val authenticationProvider = DefaultAuthenticationProvider(providerKeyPair, providerEnr)
+  private val routingTable: RoutingTable = RoutingTable(providerEnr)
+  private val authenticationProvider = DefaultAuthenticationProvider(providerKeyPair, routingTable)
 
   @Test
   fun authenticateReturnsValidAuthHeader() {
@@ -60,10 +61,11 @@
     val nonce = Bytes.fromHexString("0x012715E4EFA2464F51BE49BBC40836E5816B3552249F8AC00AD1BBDB559E44E9")
     val authTag = Bytes.fromHexString("0x39BBC27C8CFA3735DF436AC6")
     val destEnr = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+    val clientRoutingTable = RoutingTable(destEnr)
     val params = HandshakeInitParameters(nonce, authTag, providerEnr)
     val destNodeId = Hash.sha2_256(destEnr)
 
-    val clientAuthProvider = DefaultAuthenticationProvider(keyPair, destEnr)
+    val clientAuthProvider = DefaultAuthenticationProvider(keyPair, clientRoutingTable)
 
     val authHeader = clientAuthProvider.authenticate(params)
 
@@ -78,17 +80,4 @@
 
     assert(result == null)
   }
-
-  @Test
-  fun setSessionKeyPersistsSessionKeyIfExists() {
-    val nodeId = Bytes.random(32).toHexString()
-    val bytes = Bytes.random(32)
-    val sessionKey = SessionKey(bytes, bytes, bytes)
-
-    authenticationProvider.setSessionKey(nodeId, sessionKey)
-
-    val result = authenticationProvider.findSessionKey(nodeId)
-
-    assert(result != null)
-  }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
index 85a08dd..e04771f 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
@@ -22,6 +22,7 @@
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.PacketCodec
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
 import org.apache.tuweni.devp2p.v5.misc.SessionKey
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -30,7 +31,6 @@
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
 import org.junit.jupiter.api.extension.ExtendWith
 import java.net.InetAddress
 
@@ -40,20 +40,21 @@
   private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
   private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
   private val nodeId: Bytes = Hash.sha2_256(enr)
-  private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, enr)
+  private val routingTable: RoutingTable = RoutingTable(enr)
+  private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
 
-  private val codec: PacketCodec = DefaultPacketCodec(keyPair, enr, nodeId, authenticationProvider)
+  private val codec: PacketCodec = DefaultPacketCodec(keyPair, routingTable, nodeId, authenticationProvider)
 
   private val destNodeId: Bytes = Bytes.random(32)
 
   @Test
-  fun encodePerformsValidEncodingOfRandomMessasge() {
+  fun encodePerformsValidEncodingOfRandomMessage() {
     val message = RandomMessage()
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val encodedContent = encodedResult.slice(45)
-    val result = RandomMessage.create(encodedContent)
+    val encodedContent = encodedResult.content.slice(45)
+    val result = RandomMessage.create(UdpMessage.authTag(), encodedContent)
 
     assert(result.data == message.data)
   }
@@ -64,7 +65,7 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val encodedContent = encodedResult.slice(32)
+    val encodedContent = encodedResult.content.slice(32)
     val result = WhoAreYouMessage.create(encodedContent)
 
     assert(result.idNonce == message.idNonce)
@@ -83,8 +84,8 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val tag = encodedResult.slice(0, UdpMessage.TAG_LENGTH)
-    val encryptedContent = encodedResult.slice(45)
+    val tag = encodedResult.content.slice(0, UdpMessage.TAG_LENGTH)
+    val encryptedContent = encodedResult.content.slice(45)
     val content = AES128GCM.decrypt(encryptedContent, sessionKey.initiatorKey, tag).slice(1)
     val result = FindNodeMessage.create(content)
 
@@ -93,21 +94,12 @@
   }
 
   @Test
-  fun encodeFailsIfSessionKeyIsNotExists() {
-    val message = FindNodeMessage()
-
-    assertThrows<IllegalArgumentException> {
-      codec.encode(message, destNodeId)
-    }
-  }
-
-  @Test
   fun decodePerformsValidDecodingOfRandomMessasge() {
     val message = RandomMessage()
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val result = codec.decode(encodedResult).message as? RandomMessage
+    val result = codec.decode(encodedResult.content).message as? RandomMessage
 
     assert(null != result)
     assert(result!!.data == message.data)
@@ -119,7 +111,7 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val result = codec.decode(encodedResult).message as? WhoAreYouMessage
+    val result = codec.decode(encodedResult.content).message as? WhoAreYouMessage
 
     assert(null != result)
     assert(result!!.idNonce == message.idNonce)
@@ -139,7 +131,7 @@
 
     val encodedResult = codec.encode(message, nodeId)
 
-    val result = codec.decode(encodedResult).message as? FindNodeMessage
+    val result = codec.decode(encodedResult.content).message as? FindNodeMessage
 
     assert(result!!.requestId == message.requestId)
     assert(result.distance == message.distance)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
index a345208..076deb4 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
@@ -16,13 +16,18 @@
  */
 package org.apache.tuweni.devp2p.v5.internal
 
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.runBlocking
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageObserver
 import org.apache.tuweni.devp2p.v5.UdpConnector
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.junit.BouncyCastleExtension
 import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
 import org.junit.jupiter.api.AfterEach
@@ -37,18 +42,17 @@
 class DefaultUdpConnectorTest {
 
   private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
-  private val nodeId: Bytes = Bytes.fromHexString("0x98EB6D611291FA21F6169BFF382B9369C33D997FE4DC93410987E27796360640")
   private val address: InetSocketAddress = InetSocketAddress(9090)
   private val selfEnr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = address.address)
 
   private val data: Bytes = UdpMessage.randomData()
-  private val message: RandomMessage = RandomMessage(data)
+  private val message: RandomMessage = RandomMessage(UdpMessage.authTag(), data)
 
-  private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+  private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr)
 
   @BeforeEach
   fun setUp() {
-    connector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+    connector = DefaultUdpConnector(address, keyPair, selfEnr)
   }
 
   @AfterEach
@@ -93,10 +97,66 @@
       buffer.flip()
 
       val messageContent = Bytes.wrapByteBuffer(buffer).slice(45)
-      val message = RandomMessage.create(messageContent)
+      val message = RandomMessage.create(UdpMessage.authTag(), messageContent)
 
       assert(message.data == data)
     }
     socketChannel.close()
   }
+
+  @Test
+  @UseExperimental(ExperimentalCoroutinesApi::class)
+  fun attachObserverRegistersListener() {
+    val observer = object : MessageObserver {
+      var result: Channel<RandomMessage> = Channel()
+      override fun observe(message: UdpMessage) {
+        if (message is RandomMessage) {
+          result.offer(message)
+        }
+      }
+    }
+    connector.attachObserver(observer)
+    connector.start()
+
+    assert(observer.result.isEmpty)
+
+    val codec = DefaultPacketCodec(SECP256K1.KeyPair.random(), RoutingTable(Bytes.random(32)))
+    val socketChannel = CoroutineDatagramChannel.open()
+
+    runBlocking {
+      val message = RandomMessage()
+      val encodedRandomMessage = codec.encode(message, Hash.sha2_256(connector.getEnrBytes()))
+      val buffer = ByteBuffer.wrap(encodedRandomMessage.content.toArray())
+      socketChannel.send(buffer, InetSocketAddress(InetAddress.getLocalHost(), 9090))
+      val expectedResult = observer.result.receive()
+      assert(expectedResult.data == message.data)
+    }
+  }
+
+  @Test
+  @UseExperimental(ExperimentalCoroutinesApi::class)
+  fun detachObserverRemovesListener() {
+    val observer = object : MessageObserver {
+      var result: Channel<RandomMessage> = Channel()
+      override fun observe(message: UdpMessage) {
+        if (message is RandomMessage) {
+          result.offer(message)
+        }
+      }
+    }
+    connector.attachObserver(observer)
+    connector.detachObserver(observer)
+    connector.start()
+
+    val codec = DefaultPacketCodec(SECP256K1.KeyPair.random(), RoutingTable(Bytes.random(32)))
+    val socketChannel = CoroutineDatagramChannel.open()
+
+    runBlocking {
+      val message = RandomMessage()
+      val encodedRandomMessage = codec.encode(message, Hash.sha2_256(connector.getEnrBytes()))
+      val buffer = ByteBuffer.wrap(encodedRandomMessage.content.toArray())
+      socketChannel.send(buffer, InetSocketAddress(InetAddress.getLocalHost(), 9090))
+      assert(observer.result.isEmpty)
+    }
+  }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
index 22b323c..551bd27 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
@@ -34,7 +34,7 @@
     val decodingResult = FindNodeMessage.create(encodingResult)
 
     assert(decodingResult.requestId == requestId)
-    assert(decodingResult.distance == 0L)
+    assert(decodingResult.distance == 0)
   }
 
   @Test
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
index 2060816..ac125b9 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
@@ -27,12 +27,12 @@
       "0xB53CCF732982B8E950836D1E02898C8B38CFDBFDF86BC65C8826506B454E14618EA73612A0F5582C130FF666"
 
     val data = Bytes.fromHexString(expectedEncodingResult)
-    val message = RandomMessage(data)
+    val message = RandomMessage(UdpMessage.authTag(), data)
 
     val encodingResult = message.encode()
     assert(encodingResult.toHexString() == expectedEncodingResult)
 
-    val decodingResult = RandomMessage.create(encodingResult)
+    val decodingResult = RandomMessage.create(UdpMessage.authTag(), encodingResult)
 
     assert(decodingResult.data == data)
   }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt
new file mode 100644
index 0000000..4485c82
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt
@@ -0,0 +1,28 @@
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class EnrStorageTest {
+
+  private val storage: ENRStorage = DefaultENRStorage()
+
+
+  @Test
+  fun setPersistsAndFindRetrievesNodeRecord() {
+    val enr = EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost())
+
+    storage.set(enr)
+
+    val nodeId = Hash.sha2_256(enr)
+    storage.find(nodeId)
+  }
+
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
new file mode 100644
index 0000000..fbf2ede
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class RoutingTableTest {
+
+  private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
+  private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+  private val routingTable: RoutingTable = RoutingTable(enr)
+
+  private val newKeyPair = SECP256K1.KeyPair.random()
+  private val newEnr = EthereumNodeRecord.toRLP(newKeyPair, ip = InetAddress.getLocalHost())
+
+  @Test
+  fun addCreatesRecordInBucket() {
+    routingTable.add(newEnr)
+
+    assert(!routingTable.isEmpty())
+  }
+
+  @Test
+  fun evictRemovesRecord() {
+    routingTable.add(newEnr)
+
+    assert(!routingTable.isEmpty())
+
+    routingTable.evict(newEnr)
+
+    assert(routingTable.isEmpty())
+  }
+
+  @Test
+  fun getSelfEnrGivesTableOwnerEnr() {
+    val result = routingTable.getSelfEnr()
+
+    assert(result == enr)
+  }
+
+  @AfterEach
+  fun tearDown() {
+    routingTable.clear()
+  }
+}
diff --git a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
index 2299bb1..bf34fad 100644
--- a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
+++ b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
@@ -94,7 +94,8 @@
   private val selfId: ByteArray,
   k: Int,
   maxReplacements: Int = k,
-  private val nodeId: (T) -> ByteArray
+  private val nodeId: (T) -> ByteArray,
+  private val distanceToSelf: (T) -> Int = { nodeId(it) xorDist selfId }
 ) : Set<T> {
 
   companion object {
@@ -108,8 +109,13 @@
      * @return A new routing table
      */
     @JvmStatic
-    fun <T> create(selfId: ByteArray, k: Int, nodeId: Function<T, ByteArray>): KademliaRoutingTable<T> =
-      KademliaRoutingTable(selfId, k, nodeId = nodeId::apply)
+    fun <T> create(
+      selfId: ByteArray,
+      k: Int,
+      nodeId: Function<T, ByteArray>,
+      distanceToSelf: Function<T, Int>
+    ): KademliaRoutingTable<T> =
+      KademliaRoutingTable(selfId, k, nodeId = nodeId::apply, distanceToSelf = distanceToSelf::apply)
 
     /**
      * Create a new routing table.
@@ -126,8 +132,9 @@
       selfId: ByteArray,
       k: Int,
       maxReplacements: Int,
-      nodeId: Function<T, ByteArray>
-    ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply)
+      nodeId: Function<T, ByteArray>,
+      distanceToSelf: Function<T, Int>
+    ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply, distanceToSelf::apply)
   }
 
   init {
@@ -209,6 +216,16 @@
     buckets.forEach { bucket -> bucket.clear() }
   }
 
+  fun peersOfDistance(value: Int): List<T> {
+    return buckets[value].toList()
+  }
+
+  fun getRandom(): T {
+    return buckets.filter { !it.isEmpty() }.random().random()
+  }
+
+  fun logDistToSelf(node: T): Int = distanceCache.get(node) { distanceToSelf.invoke(node) }
+
   private fun idForNode(node: T): ByteArray {
     val id = nodeId(node)
     require(id.size == selfId.size) { "id obtained for node is not the correct length" }
@@ -218,8 +235,6 @@
 
   private fun bucketFor(node: T) = buckets[logDistToSelf(node)]
 
-  private fun logDistToSelf(node: T): Int = distanceCache.get(node) { idForNode(node) xorDist selfId }
-
   private class Bucket<E> private constructor(
     // ordered with most recent first
     private val entries: MutableList<E>,
diff --git a/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt b/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
index 4d5f10b..b50c4d8 100644
--- a/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
+++ b/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
@@ -47,14 +47,14 @@
 
   @Test
   fun shouldBeEmptyOnConstruction() {
-    val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
     assertTrue(table.isEmpty())
     assertEquals(0, table.size)
   }
 
   @Test
   fun shouldAddToEmptyTable() {
-    val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
     val node = Node(0x01)
     assertNull(table.add(node))
     assertTrue(table.contains(node))
@@ -63,7 +63,7 @@
 
   @Test
   fun shouldProposeEvictionIfBucketIsFull() {
-    val table = KademliaRoutingTable<Node>(shortId, 2) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 2, nodeId = { n -> n.nodeId })
     val node1 = Node(0x04)
     assertNull(table.add(node1))
     val node2 = Node(0x05)
@@ -82,7 +82,7 @@
 
   @Test
   fun shouldReplaceEvictedNode() {
-    val table = KademliaRoutingTable<Node>(shortId, 2) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 2, nodeId = { n -> n.nodeId })
     val node1 = Node(0x04)
     val node2 = Node(0x05)
     table.add(node1)
@@ -100,7 +100,7 @@
 
   @Test
   fun shouldReturnNearestOrderedNodesUpToLimit() {
-    val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
     table.add(Node(0x05))
     table.add(Node(0x04))
     val node3 = Node(0x03)
@@ -119,7 +119,7 @@
 
   @Test
   fun shouldReturnAllNodesWhenTableIsSmallerThanLimit() {
-    val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
     table.add(Node(0x05))
     table.add(Node(0x04))
     table.add(Node(0x03))
@@ -134,7 +134,7 @@
 
   @Test
   fun shouldClearAllNodes() {
-    val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+    val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
     table.add(Node(0x05))
     table.add(Node(0x04))
     table.add(Node(0x03))