save
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index c1c5e82..2daf1c2 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -84,7 +84,7 @@
    *
    * @return node identifier
    */
-  fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
+  fun findPendingNodeId(address: InetSocketAddress): Bytes
 
   /**
    * Provides node's key pair
@@ -110,4 +110,6 @@
   fun getNodesTable(): RoutingTable
 
   fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
+
+  fun findPendingMessage(nodeId: Bytes): UdpMessage?
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
index 289be97..937e0c5 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
@@ -16,10 +16,12 @@
  */
 package org.apache.tuweni.devp2p.v5.dht
 
+import com.google.common.math.IntMath
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.Hash
 import org.apache.tuweni.kademlia.KademliaRoutingTable
 import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
 
 class RoutingTable(
   private val selfEnr: Bytes
@@ -32,12 +34,19 @@
     selfId = selfNodeId,
     k = BUCKET_SIZE,
     nodeId = nodeIdCalculation,
-    distanceToSelf = { key(it) xorDist selfNodeId })
+    distanceToSelf = { IntMath.log2(key(it) xorDist selfNodeId, RoundingMode.FLOOR) })
+
+  val size: Int
+    get() = table.size
 
   fun getSelfEnr(): Bytes = selfEnr
 
   fun add(enr: Bytes): Bytes? = table.add(enr)
 
+  fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+  fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
   fun evict(enr: Bytes): Boolean = table.evict(enr)
 
   fun random(): Bytes = table.getRandom()
@@ -53,4 +62,5 @@
   companion object {
     private const val BUCKET_SIZE: Int = 16
   }
+
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 3df99b7..d4ce433 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -40,6 +40,7 @@
 import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
 import org.apache.tuweni.rlp.RLP
 import org.apache.tuweni.rlp.RLPReader
+import java.util.concurrent.ConcurrentHashMap
 import kotlin.IllegalArgumentException
 
 class DefaultPacketCodec(
@@ -115,7 +116,7 @@
 
     // Decrypt
     val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
-      ?: return RandomMessage(encryptedContent)
+      ?: return RandomMessage.create(encryptedContent)
     val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
     val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
     val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index 5c32f20..4942f11 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -49,6 +49,7 @@
 import java.net.InetSocketAddress
 import java.nio.ByteBuffer
 import java.time.Duration
+import java.util.concurrent.ConcurrentHashMap
 import java.util.logging.Logger
 import kotlin.coroutines.CoroutineContext
 
@@ -75,6 +76,10 @@
   private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
   private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
 
+  private val pendingMessages: MutableMap<String, UdpMessage> = ConcurrentHashMap()
+
+  private val askedNodes: MutableList<Bytes> = mutableListOf()
+
   private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
     .expireAfterWrite(Duration.ofMillis(PING_TIMEOUT))
     .removalListener<String, Bytes> {
@@ -83,24 +88,14 @@
 
   private lateinit var refreshJob: Job
   private lateinit var receiveJob: Job
+  private lateinit var lookupJob: Job
+
 
   override fun start() {
     receiveChannel.bind(bindAddress)
 
-    receiveJob = launch {
-      val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
-      while (receiveChannel.isOpen) {
-        datagram.clear()
-        val address = receiveChannel.receive(datagram) as InetSocketAddress
-        datagram.flip()
-        try {
-          processDatagram(datagram, address)
-        } catch (ex: Exception) {
-          log.warning(ex.message)
-        }
-      }
-    }
-
+    receiveJob = receiveDatagram()
+    lookupJob = lookupNodes()
     refreshJob = refreshNodesTable()
   }
 
@@ -112,6 +107,7 @@
   ) {
     launch {
       val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
+      pendingMessages[destNodeId.toHexString()] = message
       sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
     }
   }
@@ -120,8 +116,9 @@
     receiveChannel.close()
     sendChannel.close()
 
-    receiveJob.cancel()
     refreshJob.cancel()
+    lookupJob.cancel()
+    receiveJob.cancel()
   }
 
   override fun available(): Boolean = receiveChannel.isOpen
@@ -136,9 +133,13 @@
     authenticatingPeers[address] = nodeId
   }
 
+  override fun findPendingMessage(nodeId: Bytes): UdpMessage? {
+    return pendingMessages[nodeId.toHexString()]
+  }
+
   override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
 
-  override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
+  override fun findPendingNodeId(address: InetSocketAddress): Bytes {
     val result = authenticatingPeers[address]
       ?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
     authenticatingPeers.remove(address)
@@ -154,6 +155,43 @@
     return result
   }
 
+  private fun lookupNodes() = launch {
+    while (true) {
+      val nearestNodes = getNodesTable().nearest(selfEnr)
+      if (16 > nearestNodes.size) {
+        lookupInternal(nearestNodes)
+      } else {
+        askedNodes.clear()
+      }
+      delay(LOOKUP_REFRESH_RATE)
+    }
+  }
+
+  private fun lookupInternal(nearest: List<Bytes>) {
+    val targetNode = if (nearest.isNotEmpty()) nearest.random() else selfEnr
+    val distance = getNodesTable().distanceToSelf(targetNode)
+    for(target in nearest.take(3)) {
+      val enr = EthereumNodeRecord.fromRLP(target)
+      val message = FindNodeMessage(distance = distance)
+      val address = InetSocketAddress(enr.ip(), enr.udp())
+      send(address, message, Hash.sha2_256(target))
+    }
+  }
+
+  private fun receiveDatagram() = launch {
+    val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+    while (receiveChannel.isOpen) {
+      datagram.clear()
+      val address = receiveChannel.receive(datagram) as InetSocketAddress
+      datagram.flip()
+      try {
+        processDatagram(datagram, address)
+      } catch (ex: Exception) {
+        log.warning(ex.message)
+      }
+    }
+  }
+
   private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
     val messageBytes = Bytes.wrapByteBuffer(datagram)
     val decodeResult = packetCodec.decode(messageBytes)
@@ -167,6 +205,7 @@
       is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
     }
+    pendingMessages.remove(decodeResult.srcNodeId.toHexString())
   }
 
   private fun refreshNodesTable(): Job = launch {
@@ -181,12 +220,13 @@
         send(address, message, nodeId)
         pings.put(nodeId.toHexString(), enrBytes)
       }
-      delay(REFRESH_RATE)
+      delay(TABLE_REFRESH_RATE)
     }
   }
 
   companion object {
-    private const val REFRESH_RATE: Long = 1000
+    private const val LOOKUP_REFRESH_RATE: Long = 3000
+    private const val TABLE_REFRESH_RATE: Long = 1000
     private const val PING_TIMEOUT: Long = 20000
   }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..422a21f 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -36,11 +36,11 @@
     connector: UdpConnector
   ) {
     // Retrieve enr
-    val destRlp = connector.getPendingNodeIdByAddress(address)
+    val destRlp = connector.findPendingNodeId(address)
     val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
     val destNodeId = Hash.sha2_256(destRlp)
 
-    val findNodeMessage = FindNodeMessage()
-    connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+    val response = connector.findPendingMessage(srcNodeId) ?: FindNodeMessage()
+    connector.send(address, response, destNodeId, handshakeParams)
   }
 }