Merge branch 'v5-udp-transport' into v5-topics

# Conflicts:
#	devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
#	devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
new file mode 100644
index 0000000..8224a27
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
@@ -0,0 +1,11 @@
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.bytes.Bytes
+
+interface ENRStorage {
+
+  fun set(enr: Bytes)
+
+  fun find(nodeId: Bytes): Bytes?
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt
new file mode 100644
index 0000000..2d926ca
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt
@@ -0,0 +1,9 @@
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+interface MessageObserver {
+
+  fun observe(message: UdpMessage)
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
index ee4db65..cbe8a3d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
@@ -22,10 +22,14 @@
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
 import org.apache.tuweni.io.Base64URLSafe
 import java.net.InetSocketAddress
 import java.time.Instant
@@ -62,7 +66,8 @@
     null,
     bindAddress.port
   ),
-  private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR),
+  private val enrStorage: ENRStorage = DefaultENRStorage(),
+  private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR, enrStorage),
   override val coroutineContext: CoroutineContext = Dispatchers.Default
 ) : NodeDiscoveryService, CoroutineScope {
 
@@ -90,9 +95,10 @@
         val randomMessage = RandomMessage()
         val address = InetSocketAddress(enr.ip(), enr.udp())
 
+        val destNodeId = Hash.sha2_256(rlpENR)
+        enrStorage.set(rlpENR)
         connector.getNodesTable().add(rlpENR)
-        connector.addPendingNodeId(address, rlpENR)
-        connector.send(address, randomMessage, rlpENR)
+        connector.send(address, randomMessage, destNodeId)
       }
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
index eec5073..23a2cf3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
@@ -19,6 +19,7 @@
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 
 /**
@@ -43,7 +44,7 @@
    *
    * @return encoded message
    */
-  fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): Bytes
+  fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): EncodeResult
 
   /**
    * Decodes message, decrypting it's body
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 948903d..72900f0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -19,9 +19,10 @@
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
 import org.apache.tuweni.devp2p.v5.topic.TicketHolder
 import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
 import org.apache.tuweni.devp2p.v5.topic.TopicTable
@@ -87,7 +88,7 @@
    *
    * @return node identifier
    */
-  fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
+  fun findPendingNodeId(address: InetSocketAddress): Bytes
 
   /**
    * Provides node's key pair
@@ -110,6 +111,10 @@
    */
   fun getEnr(): EthereumNodeRecord
 
+  fun attachObserver(observer: MessageObserver)
+
+  fun detachObserver(observer: MessageObserver)
+
   fun getNodesTable(): RoutingTable
 
   /**
@@ -128,6 +133,10 @@
 
   fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
 
+  fun getPendingMessage(authTag: Bytes): TrackingMessage
+
+  fun getNodeRecords(): ENRStorage
+
   fun getTopicRegistrar(): TopicRegistrar
 
   fun getSessionInitiatorKey(nodeId: Bytes): Bytes
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
index cb21279..0858bdd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
@@ -27,7 +27,7 @@
  */
 object SessionKeyGenerator {
 
-  private const val DERIVED_KEY_SIZE: Int = 16
+  const val DERIVED_KEY_SIZE: Int = 16
 
   private val INFO_PREFIX = Bytes.wrap("discovery v5 key agreement".toByteArray())
 
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
index 7c397ca..68f48e6 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
@@ -24,7 +24,7 @@
 import org.apache.tuweni.devp2p.ENR_REQUEST_RETRY_DELAY_MS
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
 import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
 import org.apache.tuweni.devp2p.v5.misc.AuthHeader
@@ -40,7 +40,7 @@
 
   private val sessionKeys: Cache<String, SessionKey> = CacheBuilder
     .newBuilder()
-    .expireAfterWrite(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
+    .expireAfterWrite(SESSION_KEY_EXPIRATION, TimeUnit.MINUTES)
     .build()
   private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr())
 
@@ -133,6 +133,8 @@
   }
 
   companion object {
+    private const val SESSION_KEY_EXPIRATION: Long = 5
+
     private const val ZERO_NONCE_SIZE: Int = 12
     private const val VERSION: Int = 5
 
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 4c0080d..6216138 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -21,14 +21,16 @@
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.PacketCodec
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
+import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.AuthHeader
 import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import org.apache.tuweni.devp2p.v5.packet.NodesMessage
 import org.apache.tuweni.devp2p.v5.packet.PingMessage
@@ -48,37 +50,36 @@
   private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
 ) : PacketCodec {
 
-  override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): Bytes {
+  override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): EncodeResult {
     if (message is WhoAreYouMessage) {
       val magic = UdpMessage.magic(nodeId)
       val content = message.encode()
-      return Bytes.wrap(magic, content)
+      return EncodeResult(magic, Bytes.wrap(magic, content))
     }
 
     val tag = UdpMessage.tag(nodeId, destNodeId)
     if (message is RandomMessage) {
-      val authTag = UdpMessage.authTag()
-      val rlpAuthTag = RLP.encodeValue(authTag)
+      val rlpAuthTag = RLP.encodeValue(message.authTag)
       val content = message.encode()
-      return Bytes.wrap(tag, rlpAuthTag, content)
+      return EncodeResult(message.authTag, Bytes.wrap(tag, rlpAuthTag, content))
     }
 
     val authHeader = handshakeParams?.let { authenticationProvider.authenticate(handshakeParams) }
 
     val initiatorKey = authenticationProvider.findSessionKey(destNodeId.toHexString())?.initiatorKey
-      ?: throw IllegalArgumentException() // TODO handle
+      ?: Bytes.random(SessionKeyGenerator.DERIVED_KEY_SIZE) // encrypt with random key, to initiate handshake
     val messagePlain = Bytes.wrap(message.getMessageType(), message.encode())
     return if (null != authHeader) {
       val encodedHeader = authHeader.asRlp()
       val authTag = authHeader.authTag
       val encryptionMeta = Bytes.wrap(tag, encodedHeader)
       val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, encryptionMeta)
-      Bytes.wrap(tag, encodedHeader, encryptionResult)
+      EncodeResult(authTag, Bytes.wrap(tag, encodedHeader, encryptionResult))
     } else {
       val authTag = UdpMessage.authTag()
       val authTagHeader = RLP.encodeValue(authTag)
       val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, tag)
-      Bytes.wrap(tag, authTagHeader, encryptionResult)
+      EncodeResult(authTag, Bytes.wrap(tag, authTagHeader, encryptionResult))
     }
   }
 
@@ -93,6 +94,7 @@
   private fun read(tag: Bytes, senderNodeId: Bytes, contentWithHeader: Bytes, reader: RLPReader): UdpMessage {
     // Distinguish auth header or auth tag
     var authHeader: AuthHeader? = null
+    var authTag: Bytes = Bytes.EMPTY
     if (reader.nextIsList()) {
       if (WHO_ARE_YOU_MESSAGE_LENGTH == contentWithHeader.size()) {
         return WhoAreYouMessage.create(contentWithHeader)
@@ -107,14 +109,14 @@
       }
       authenticationProvider.finalizeHandshake(senderNodeId, authHeader)
     } else {
-      reader.readValue()
+      authTag = reader.readValue()
     }
 
     val encryptedContent = contentWithHeader.slice(reader.position())
 
     // Decrypt
     val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
-      ?: return RandomMessage(encryptedContent)
+      ?: return RandomMessage.create(authTag, encryptedContent)
     val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
     val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
     val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index 6de199e..6816324 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -27,11 +27,13 @@
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.MessageObserver
 import org.apache.tuweni.devp2p.v5.PacketCodec
 import org.apache.tuweni.devp2p.v5.UdpConnector
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
@@ -44,6 +46,11 @@
 import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
 import org.apache.tuweni.devp2p.v5.packet.NodesMessage
 import org.apache.tuweni.devp2p.v5.packet.PingMessage
 import org.apache.tuweni.devp2p.v5.packet.PongMessage
@@ -57,6 +64,7 @@
 import org.apache.tuweni.devp2p.v5.topic.TicketHolder
 import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
 import org.apache.tuweni.devp2p.v5.topic.TopicTable
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
 import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
 import java.net.InetSocketAddress
 import java.nio.ByteBuffer
@@ -68,9 +76,8 @@
   private val bindAddress: InetSocketAddress,
   private val keyPair: SECP256K1.KeyPair,
   private val selfEnr: Bytes,
-  private val nodeId: Bytes = Hash.sha2_256(selfEnr),
+  private val enrStorage: ENRStorage = DefaultENRStorage(),
   private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
-  private val sendChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
   private val nodesTable: RoutingTable = RoutingTable(selfEnr),
   private val topicTable: TopicTable = TopicTable(),
   private val ticketHolder: TicketHolder = TicketHolder(),
@@ -78,13 +85,14 @@
   private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable, nodeId, authenticationProvider),
   private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf(),
   private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
+  private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
   override val coroutineContext: CoroutineContext = Dispatchers.IO
 ) : UdpConnector, CoroutineScope {
 
   private val log: Logger = Logger.getLogger(this.javaClass.simpleName)
 
   private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
-  private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+  private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler()
   private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
   private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
   private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
@@ -96,32 +104,44 @@
 
   private val topicRegistrar = TopicRegistrar(coroutineContext, this)
 
+  private val askedNodes: MutableList<Bytes> = mutableListOf()
+
+  private val pendingMessages: Cache<String, TrackingMessage> = CacheBuilder.newBuilder()
+    .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
+    .build()
   private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
-    .expireAfterWrite(Duration.ofMillis(PING_TIMEOUT))
+    .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
     .removalListener<String, Bytes> {
       getNodesTable().evict(it.value)
     }.build()
 
   private lateinit var refreshJob: Job
   private lateinit var receiveJob: Job
+  private lateinit var lookupJob: Job
+
+  override fun available(): Boolean = receiveChannel.isOpen
+
+  override fun started(): Boolean = ::receiveJob.isInitialized && available()
+
+  override fun getEnrBytes(): Bytes = selfEnr
+
+  override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+
+  override fun getNodeRecords(): ENRStorage = enrStorage
+
+  override fun getNodesTable(): RoutingTable = nodesTable
+
+  override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+
+  override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
+    ?: throw IllegalArgumentException("Pending message not found")
+
 
   override fun start() {
     receiveChannel.bind(bindAddress)
 
-    receiveJob = launch {
-      val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
-      while (receiveChannel.isOpen) {
-        datagram.clear()
-        val address = receiveChannel.receive(datagram) as InetSocketAddress
-        datagram.flip()
-        try {
-          processDatagram(datagram, address)
-        } catch (ex: Exception) {
-          log.warning(ex.message)
-        }
-      }
-    }
-
+    receiveJob = receiveDatagram()
+    lookupJob = lookupNodes()
     refreshJob = refreshNodesTable()
   }
 
@@ -132,42 +152,39 @@
     handshakeParams: HandshakeInitParameters?
   ) {
     launch {
-      val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
-      sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
+      val encodeResult = packetCodec.encode(message, destNodeId, handshakeParams)
+      pendingMessages.put(encodeResult.authTag.toHexString(), TrackingMessage(message, destNodeId))
+      receiveChannel.send(ByteBuffer.wrap(encodeResult.content.toArray()), address)
     }
   }
 
   override fun terminate() {
     receiveChannel.close()
-    sendChannel.close()
 
-    receiveJob.cancel()
     refreshJob.cancel()
+    lookupJob.cancel()
+    receiveJob.cancel()
   }
 
-  override fun available(): Boolean = receiveChannel.isOpen
+  override fun attachObserver(observer: MessageObserver) {
+    messageListeners.add(observer)
+  }
 
-  override fun started(): Boolean = ::receiveJob.isInitialized && available()
-
-  override fun getEnrBytes(): Bytes = selfEnr
-
-  override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+  override fun detachObserver(observer: MessageObserver) {
+    messageListeners.remove(observer)
+  }
 
   override fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes) {
     authenticatingPeers[address] = nodeId
   }
 
-  override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
-
-  override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
+  override fun findPendingNodeId(address: InetSocketAddress): Bytes {
     val result = authenticatingPeers[address]
       ?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
     authenticatingPeers.remove(address)
     return result
   }
 
-  override fun getNodesTable(): RoutingTable = nodesTable
-
   override fun getTopicTable(): TopicTable = topicTable
 
   override fun getTicketHolder(): TicketHolder = ticketHolder
@@ -186,6 +203,45 @@
       ?: throw IllegalArgumentException("Session key not found.")
   }
 
+  // Lookup nodes
+  private fun lookupNodes() = launch {
+    while (true) {
+      val nearestNodes = getNodesTable().nearest(selfEnr)
+      if (16 > nearestNodes.size) {
+        lookupInternal(nearestNodes)
+      } else {
+        askedNodes.clear()
+      }
+      delay(LOOKUP_REFRESH_RATE)
+    }
+  }
+
+  private fun lookupInternal(nearest: List<Bytes>) {
+    val targetNode = if (nearest.isNotEmpty()) nearest.random() else Bytes.random(32)
+    val distance = getNodesTable().distanceToSelf(targetNode)
+    for (target in nearest.take(3)) {
+      val enr = EthereumNodeRecord.fromRLP(target)
+      val message = FindNodeMessage(distance = distance)
+      val address = InetSocketAddress(enr.ip(), enr.udp())
+      send(address, message, Hash.sha2_256(target))
+    }
+  }
+
+  // Process packets
+  private fun receiveDatagram() = launch {
+    val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+    while (receiveChannel.isOpen) {
+      datagram.clear()
+      val address = receiveChannel.receive(datagram) as InetSocketAddress
+      datagram.flip()
+      try {
+        processDatagram(datagram, address)
+      } catch (ex: Exception) {
+        log.warning(ex.message)
+      }
+    }
+  }
+
   private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
     val messageBytes = Bytes.wrapByteBuffer(datagram)
     val decodeResult = packetCodec.decode(messageBytes)
@@ -203,8 +259,10 @@
       is TopicQueryMessage -> topicQueryMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
     }
+    messageListeners.forEach { it.observe(message) }
   }
 
+  // Ping nodes
   private fun refreshNodesTable(): Job = launch {
     while (true) {
       if (!getNodesTable().isEmpty()) {
@@ -217,12 +275,13 @@
         send(address, message, nodeId)
         pings.put(nodeId.toHexString(), enrBytes)
       }
-      delay(REFRESH_RATE)
+      delay(TABLE_REFRESH_RATE)
     }
   }
 
   companion object {
-    private const val REFRESH_RATE: Long = 1000
-    private const val PING_TIMEOUT: Long = 20000
+    private const val LOOKUP_REFRESH_RATE: Long = 3000
+    private const val TABLE_REFRESH_RATE: Long = 1000
+    private const val REQUEST_TIMEOUT: Long = 1000
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
index 60b731d..d8b33dd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -28,6 +28,7 @@
   override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
     message.nodeRecords.forEach {
       EthereumNodeRecord.fromRLP(it)
+      connector.getNodeRecords().set(it)
       connector.getNodesTable().add(it)
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
index 477710a..60f965d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
@@ -26,8 +26,7 @@
 class RandomMessageHandler : MessageHandler<RandomMessage> {
 
   override fun handle(message: RandomMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
-    connector.addPendingNodeId(address, srcNodeId)
-    val response = WhoAreYouMessage()
+    val response = WhoAreYouMessage(message.authTag)
     connector.send(address, response, srcNodeId)
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..62025f3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -17,17 +17,16 @@
 package org.apache.tuweni.devp2p.v5.internal.handler
 
 import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.devp2p.v5.MessageHandler
 import org.apache.tuweni.devp2p.v5.UdpConnector
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import java.lang.IllegalArgumentException
 import java.net.InetSocketAddress
 
-class WhoAreYouMessageHandler(
-  private val nodeId: Bytes
-) : MessageHandler<WhoAreYouMessage> {
+class WhoAreYouMessageHandler : MessageHandler<WhoAreYouMessage> {
 
   override fun handle(
     message: WhoAreYouMessage,
@@ -36,11 +35,12 @@
     connector: UdpConnector
   ) {
     // Retrieve enr
-    val destRlp = connector.getPendingNodeIdByAddress(address)
-    val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
-    val destNodeId = Hash.sha2_256(destRlp)
+    val trackingMessage = connector.getPendingMessage(message.authTag)
+    val rlpEnr = connector.getNodeRecords().find(trackingMessage.nodeId)
+      ?: throw IllegalArgumentException("Unable to find node enr by id ${trackingMessage.nodeId}")
+    val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, rlpEnr)
 
-    val findNodeMessage = FindNodeMessage()
-    connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+    val response = if (trackingMessage.message is RandomMessage) FindNodeMessage() else trackingMessage.message
+    connector.send(address, response, trackingMessage.nodeId, handshakeParams)
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
new file mode 100644
index 0000000..5100329
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
@@ -0,0 +1,8 @@
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+
+class EncodeResult(
+  val authTag: Bytes,
+  val content: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
new file mode 100644
index 0000000..66987c1
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
@@ -0,0 +1,9 @@
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+class TrackingMessage(
+  val message: UdpMessage,
+  val nodeId: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
index e43ded7..99dfb51 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
@@ -19,6 +19,7 @@
 import org.apache.tuweni.bytes.Bytes
 
 class RandomMessage(
+  val authTag: Bytes = authTag(),
   val data: Bytes = randomData()
 ) : UdpMessage() {
 
@@ -27,8 +28,8 @@
   }
 
   companion object {
-    fun create(content: Bytes): RandomMessage {
-      return RandomMessage(content)
+    fun create(authTag: Bytes, content: Bytes): RandomMessage {
+      return RandomMessage(authTag, content)
     }
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
new file mode 100644
index 0000000..80e9d5b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
@@ -0,0 +1,19 @@
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import java.util.concurrent.ConcurrentHashMap
+
+class DefaultENRStorage: ENRStorage {
+
+  private val storage: MutableMap<String, Bytes> = ConcurrentHashMap()
+
+  override fun find(nodeId: Bytes): Bytes? = storage[nodeId.toHexString()]
+
+  override fun set(enr: Bytes) {
+    val nodeId = Hash.sha2_256(enr)
+    storage[nodeId.toHexString()] = enr
+  }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
similarity index 80%
rename from devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
rename to devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
index 289be97..cd48508 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tuweni.devp2p.v5.dht
+package org.apache.tuweni.devp2p.v5.storage
 
+import com.google.common.math.IntMath
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.kademlia.KademliaRoutingTable
 import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
 
 class RoutingTable(
   private val selfEnr: Bytes
@@ -32,12 +34,19 @@
     selfId = selfNodeId,
     k = BUCKET_SIZE,
     nodeId = nodeIdCalculation,
-    distanceToSelf = { key(it) xorDist selfNodeId })
+    distanceToSelf = { IntMath.log2(key(it) xorDist selfNodeId, RoundingMode.FLOOR) })
+
+  val size: Int
+    get() = table.size
 
   fun getSelfEnr(): Bytes = selfEnr
 
   fun add(enr: Bytes): Bytes? = table.add(enr)
 
+  fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+  fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
   fun evict(enr: Bytes): Boolean = table.evict(enr)
 
   fun random(): Bytes = table.getRandom()
@@ -53,4 +62,5 @@
   companion object {
     private const val BUCKET_SIZE: Int = 16
   }
+
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
new file mode 100644
index 0000000..f503787
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
@@ -0,0 +1,125 @@
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
+import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+import java.net.InetSocketAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+abstract class AbstractIntegrationTest {
+
+  protected fun createNode(
+    port: Int = 9090,
+    bootList: List<String> = emptyList(),
+    enrStorage: ENRStorage = DefaultENRStorage(),
+    keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random(),
+    enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = port),
+    routingTable: RoutingTable = RoutingTable(enr),
+    address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), port),
+    authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable),
+    packetCodec: PacketCodec = DefaultPacketCodec(
+      keyPair,
+      routingTable,
+      authenticationProvider = authenticationProvider
+    ),
+    connector: UdpConnector = DefaultUdpConnector(
+      address,
+      keyPair,
+      enr,
+      enrStorage,
+      nodesTable = routingTable,
+      packetCodec = packetCodec
+    ),
+    service: NodeDiscoveryService =
+      DefaultNodeDiscoveryService(
+        keyPair,
+        port,
+        enrStorage = enrStorage,
+        bootstrapENRList = bootList,
+        connector = connector
+      )
+  ): TestNode {
+    service.start()
+    return TestNode(
+      bootList,
+      port,
+      enrStorage,
+      keyPair,
+      enr,
+      address,
+      routingTable,
+      authenticationProvider,
+      packetCodec,
+      connector,
+      service
+    )
+  }
+
+  protected fun handshake(initiator: TestNode, recipient: TestNode): Boolean {
+    initiator.enrStorage.set(recipient.enr)
+    initiator.routingTable.add(recipient.enr)
+    val message = RandomMessage()
+    initiator.connector.send(recipient.address, message, recipient.nodeId)
+    while (true) {
+      if (null != recipient.authenticationProvider.findSessionKey(initiator.nodeId.toHexString())) {
+        return true
+      }
+    }
+  }
+
+  protected fun send(initiator: TestNode, recipient: TestNode, message: UdpMessage) {
+    if (message is RandomMessage || message is WhoAreYouMessage) {
+      throw IllegalArgumentException("Can't send handshake initiation message")
+    }
+    initiator.connector.send(recipient.address, message, recipient.nodeId)
+  }
+
+  protected inline fun <reified T: UdpMessage>sendAndAwait(initiator: TestNode, recipient: TestNode, message: UdpMessage): T {
+    val listener = object : MessageObserver {
+      var result: Channel<T> = Channel()
+
+      override fun observe(message: UdpMessage) {
+        if (message is T) {
+          result.offer(message)
+        }
+      }
+    }
+
+    return runBlocking {
+      initiator.connector.attachObserver(listener)
+      send(initiator, recipient, message)
+      val result = listener.result.receive()
+      initiator.connector.detachObserver(listener)
+      result
+    }
+  }
+}
+
+class TestNode(
+  val bootList: List<String>,
+  val port: Int,
+  val enrStorage: ENRStorage,
+  val keyPair: SECP256K1.KeyPair,
+  val enr: Bytes,
+  val address: InetSocketAddress,
+  val routingTable: RoutingTable,
+  val authenticationProvider: AuthenticationProvider,
+  val packetCodec: PacketCodec,
+  val connector: UdpConnector,
+  val service: NodeDiscoveryService,
+  val nodeId: Bytes = Hash.sha2_256(enr)
+)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
index a9f64e5..2b7d8a7 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
@@ -64,7 +64,7 @@
       bootstrapENRList,
       enrSeq,
       selfENR,
-      connector
+      connector = connector
     )
 
   @Test
@@ -81,7 +81,7 @@
       val receivedBytes = Bytes.wrapByteBuffer(buffer)
       val content = receivedBytes.slice(45)
 
-      val message = RandomMessage.create(content)
+      val message = RandomMessage.create(UdpMessage.authTag(), content)
       assert(message.data.size() == UdpMessage.RANDOM_DATA_LENGTH)
     }
 
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
deleted file mode 100644
index fa99cc3..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5
-
-import kotlinx.coroutines.runBlocking
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
-import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
-import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
-import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
-import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
-import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
-import org.apache.tuweni.devp2p.v5.packet.RandomMessage
-import org.apache.tuweni.devp2p.v5.packet.UdpMessage
-import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
-import org.apache.tuweni.io.Base64URLSafe
-import org.apache.tuweni.junit.BouncyCastleExtension
-import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.extension.ExtendWith
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-@ExtendWith(BouncyCastleExtension::class)
-class HandshakeIntegrationTest {
-
-  private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
-  private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = SERVICE_PORT)
-  private val address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), SERVICE_PORT)
-  private val connector: UdpConnector = DefaultUdpConnector(address, keyPair, enr)
-
-  private val clientKeyPair = SECP256K1.KeyPair.random()
-  private val clientEnr = EthereumNodeRecord.toRLP(clientKeyPair, ip = InetAddress.getLocalHost(), udp = CLIENT_PORT)
-  private val routingTable = RoutingTable(clientEnr)
-  private val authProvider = DefaultAuthenticationProvider(clientKeyPair, routingTable)
-  private val clientCodec = DefaultPacketCodec(clientKeyPair, routingTable, authenticationProvider = authProvider)
-  private val socket = CoroutineDatagramChannel.open()
-
-  private val clientNodeId: Bytes = Hash.sha2_256(clientEnr)
-
-  private val bootList = listOf("enr:${Base64URLSafe.encode(clientEnr)}")
-  private val service: NodeDiscoveryService =
-    DefaultNodeDiscoveryService(keyPair, SERVICE_PORT, bootstrapENRList = bootList, connector = connector)
-
-  @BeforeEach
-  fun init() {
-    socket.bind(InetSocketAddress(9091))
-    service.start()
-  }
-
-  @Test
-  fun discv5HandshakeTest() {
-    runBlocking {
-      val buffer = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
-      socket.receive(buffer)
-      buffer.flip()
-
-      var content = Bytes.wrapByteBuffer(buffer)
-      var decodingResult = clientCodec.decode(content)
-      assert(decodingResult.message is RandomMessage)
-      buffer.clear()
-
-      sendWhoAreYou()
-
-      socket.receive(buffer)
-      buffer.flip()
-      content = Bytes.wrapByteBuffer(buffer)
-      decodingResult = clientCodec.decode(content)
-      assert(decodingResult.message is FindNodeMessage)
-      assert(null != authProvider.findSessionKey(Hash.sha2_256(enr).toHexString()))
-
-      val message = decodingResult.message as FindNodeMessage
-
-      assert(message.distance == 0)
-      assert(message.requestId.size() == UdpMessage.REQUEST_ID_LENGTH)
-    }
-  }
-
-  @AfterEach
-  fun tearDown() {
-    service.terminate(true)
-    socket.close()
-  }
-
-  private suspend fun sendWhoAreYou() {
-    val message = WhoAreYouMessage()
-    val bytes = clientCodec.encode(message, clientNodeId)
-    val buffer = ByteBuffer.wrap(bytes.toArray())
-    socket.send(buffer, address)
-  }
-
-  companion object {
-    private const val SERVICE_PORT: Int = 9090
-    private const val CLIENT_PORT: Int = 9091
-  }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt
new file mode 100644
index 0000000..c76b91c
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt
@@ -0,0 +1,57 @@
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.junit.jupiter.api.Test
+
+class HandshakeTest: AbstractIntegrationTest() {
+
+  @Test
+  fun testHandshake() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    val result = handshake(node1, node2)
+
+    assert(result)
+
+    node1.service.terminate(true)
+    node2.service.terminate(true)
+  }
+
+  @Test
+  fun testPing() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    handshake(node1, node2)
+    val pong = sendAndAwait<PongMessage>(node1, node2, PingMessage())
+
+    assert(node1.port == pong.recipientPort)
+
+    node1.service.terminate(true)
+    node2.service.terminate(true)
+  }
+
+  @Test
+  fun testTableMaintenance() {
+    val node1 = createNode(9090)
+    val node2 = createNode(9091)
+
+    handshake(node1, node2)
+    runBlocking {
+      assert(!node1.routingTable.isEmpty())
+
+      node2.service.terminate( true)
+
+      delay(3000)
+
+      assert(node1.routingTable.isEmpty())
+    }
+
+    node1.service.terminate(true)
+  }
+
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
index 1d9bb5a..7629d80 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
@@ -20,7 +20,7 @@
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import org.apache.tuweni.devp2p.v5.misc.SessionKey
 import org.apache.tuweni.junit.BouncyCastleExtension
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
index 18a5577..50a677d 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
@@ -22,7 +22,7 @@
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.PacketCodec
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
 import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
 import org.apache.tuweni.devp2p.v5.misc.SessionKey
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -54,8 +54,8 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val encodedContent = encodedResult.slice(45)
-    val result = RandomMessage.create(encodedContent)
+    val encodedContent = encodedResult.content.slice(45)
+    val result = RandomMessage.create(UdpMessage.authTag(), encodedContent)
 
     assert(result.data == message.data)
   }
@@ -66,7 +66,7 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val encodedContent = encodedResult.slice(32)
+    val encodedContent = encodedResult.content.slice(32)
     val result = WhoAreYouMessage.create(encodedContent)
 
     assert(result.idNonce == message.idNonce)
@@ -85,8 +85,8 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val tag = encodedResult.slice(0, UdpMessage.TAG_LENGTH)
-    val encryptedContent = encodedResult.slice(45)
+    val tag = encodedResult.content.slice(0, UdpMessage.TAG_LENGTH)
+    val encryptedContent = encodedResult.content.slice(45)
     val content = AES128GCM.decrypt(encryptedContent, sessionKey.initiatorKey, tag).slice(1)
     val result = FindNodeMessage.create(content)
 
@@ -95,21 +95,12 @@
   }
 
   @Test
-  fun encodeFailsIfSessionKeyIsNotExists() {
-    val message = FindNodeMessage()
-
-    assertThrows<IllegalArgumentException> {
-      codec.encode(message, destNodeId)
-    }
-  }
-
-  @Test
   fun decodePerformsValidDecodingOfRandomMessasge() {
     val message = RandomMessage()
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val result = codec.decode(encodedResult).message as? RandomMessage
+    val result = codec.decode(encodedResult.content).message as? RandomMessage
 
     assert(null != result)
     assert(result!!.data == message.data)
@@ -121,7 +112,7 @@
 
     val encodedResult = codec.encode(message, destNodeId)
 
-    val result = codec.decode(encodedResult).message as? WhoAreYouMessage
+    val result = codec.decode(encodedResult.content).message as? WhoAreYouMessage
 
     assert(null != result)
     assert(result!!.idNonce == message.idNonce)
@@ -141,7 +132,7 @@
 
     val encodedResult = codec.encode(message, nodeId)
 
-    val result = codec.decode(encodedResult).message as? FindNodeMessage
+    val result = codec.decode(encodedResult.content).message as? FindNodeMessage
 
     assert(result!!.requestId == message.requestId)
     assert(result.distance == message.distance)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
index a345208..7d77329 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
@@ -37,18 +37,17 @@
 class DefaultUdpConnectorTest {
 
   private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
-  private val nodeId: Bytes = Bytes.fromHexString("0x98EB6D611291FA21F6169BFF382B9369C33D997FE4DC93410987E27796360640")
   private val address: InetSocketAddress = InetSocketAddress(9090)
   private val selfEnr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = address.address)
 
   private val data: Bytes = UdpMessage.randomData()
-  private val message: RandomMessage = RandomMessage(data)
+  private val message: RandomMessage = RandomMessage(UdpMessage.authTag(), data)
 
-  private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+  private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr)
 
   @BeforeEach
   fun setUp() {
-    connector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+    connector = DefaultUdpConnector(address, keyPair, selfEnr)
   }
 
   @AfterEach
@@ -93,7 +92,7 @@
       buffer.flip()
 
       val messageContent = Bytes.wrapByteBuffer(buffer).slice(45)
-      val message = RandomMessage.create(messageContent)
+      val message = RandomMessage.create(UdpMessage.authTag(), messageContent)
 
       assert(message.data == data)
     }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
index 2060816..ac125b9 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
@@ -27,12 +27,12 @@
       "0xB53CCF732982B8E950836D1E02898C8B38CFDBFDF86BC65C8826506B454E14618EA73612A0F5582C130FF666"
 
     val data = Bytes.fromHexString(expectedEncodingResult)
-    val message = RandomMessage(data)
+    val message = RandomMessage(UdpMessage.authTag(), data)
 
     val encodingResult = message.encode()
     assert(encodingResult.toHexString() == expectedEncodingResult)
 
-    val decodingResult = RandomMessage.create(encodingResult)
+    val decodingResult = RandomMessage.create(UdpMessage.authTag(), encodingResult)
 
     assert(decodingResult.data == data)
   }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
similarity index 97%
rename from devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt
rename to devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
index cf0a17c..fbf2ede 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tuweni.devp2p.v5.dht
+package org.apache.tuweni.devp2p.v5.storage
 
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.SECP256K1