find node + nodes message handler
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
index a6dc884..86511e6 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
@@ -21,6 +21,7 @@
 import org.apache.tuweni.crypto.Hash.keccak256
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
 
 /**
  * A routing table for ÐΞVp2p peers.
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 9770885..ad86da5 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -18,6 +18,7 @@
 
 import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.v5.dht.RoutingTable
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import java.net.InetSocketAddress
@@ -97,4 +98,6 @@
    * @return node's ENR
    */
   fun getEnr(): Bytes
+
+  fun getNodesTable(): RoutingTable
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
new file mode 100644
index 0000000..40cb81e
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
@@ -0,0 +1,35 @@
+package org.apache.tuweni.devp2p.v5.dht
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
+
+class RoutingTable(
+  selfEnr: Bytes
+) {
+
+  private val selfNodeId = key(selfEnr)
+  private val nodeIdCalculation: (Bytes) -> ByteArray = { enr -> key(enr) }
+  private val table = KademliaRoutingTable(
+    selfId = selfNodeId,
+    k = BUCKET_SIZE,
+    nodeId = nodeIdCalculation,
+    distanceToSelf = { key(it) xorDist selfNodeId })
+
+
+  fun add(enr: Bytes): Bytes? = table.add(enr)
+
+  fun nearest(nodeId: Bytes, limit: Int): List<Bytes> = table.nearest(nodeId.toArray(), limit)
+
+  fun evict(enr: Bytes): Boolean = table.evict(enr)
+
+  fun nodesOfDistance(distance: Int): List<Bytes> = table.peersOfDistance(distance)
+
+  private fun key(enr: Bytes): ByteArray = Hash.sha2_256(enr).toArray()
+
+  companion object {
+    private const val BUCKET_SIZE: Int = 16
+  }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index ebb4eb8..fc70c27 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -26,6 +26,9 @@
 import org.apache.tuweni.devp2p.v5.MessageHandler
 import org.apache.tuweni.devp2p.v5.PacketCodec
 import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -33,6 +36,7 @@
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
 import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
 import java.net.InetSocketAddress
 import java.nio.ByteBuffer
@@ -54,9 +58,13 @@
 
   private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
   private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+  private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
+  private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
 
   private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf()
 
+  private val nodesTable: RoutingTable = RoutingTable(selfEnr)
+
   private lateinit var receiveJob: Job
 
   override fun start() {
@@ -114,6 +122,8 @@
     return result
   }
 
+  override fun getNodesTable(): RoutingTable = nodesTable
+
   private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
     val messageBytes = Bytes.wrapByteBuffer(datagram)
     val decodeResult = packetCodec.decode(messageBytes)
@@ -121,7 +131,8 @@
     when (message) {
       is RandomMessage -> randomMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       is WhoAreYouMessage -> whoAreYouMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
-      is FindNodeMessage -> { } //TODO: response with NODES message
+      is FindNodeMessage -> findNodeMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
new file mode 100644
index 0000000..47575d0
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
@@ -0,0 +1,27 @@
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class FindNodeMessageHandler: MessageHandler<FindNodeMessage> {
+
+  override fun handle(message: FindNodeMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    val nodes = connector.getNodesTable().nodesOfDistance(message.distance)
+
+    var caret = 0
+    while (caret < nodes.size) {
+      val response = NodesMessage(message.requestId, nodes.size, nodes.subList(caret, caret + MAX_NODES_IN_RESPONSE))
+      connector.send(address, response, srcNodeId)
+      caret += MAX_NODES_IN_RESPONSE
+    }
+  }
+
+  companion object {
+      private const val MAX_NODES_IN_RESPONSE: Int = 16
+  }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
new file mode 100644
index 0000000..00e2fbc
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -0,0 +1,19 @@
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class NodesMessageHandler: MessageHandler<NodesMessage> {
+
+  override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    message.nodeRecords.forEach {
+      EthereumNodeRecord.fromRLP(it)
+      connector.getNodesTable().add(it)
+    }
+  }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
index b39ae46..777d389 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
@@ -21,7 +21,7 @@
 
 class FindNodeMessage(
   val requestId: Bytes = UdpMessage.requestId(),
-  val distance: Long = 0
+  val distance: Int = 0
 ) : UdpMessage() {
 
   private val encodedMessageType: Bytes = Bytes.fromHexString("0x03")
@@ -29,7 +29,7 @@
   override fun encode(): Bytes {
     return RLP.encodeList { writer ->
       writer.writeValue(requestId)
-      writer.writeLong(distance)
+      writer.writeInt(distance)
     }
   }
 
@@ -39,7 +39,7 @@
     fun create(content: Bytes): FindNodeMessage {
       return RLP.decodeList(content) { reader ->
         val requestId = reader.readValue()
-        val distance = reader.readLong()
+        val distance = reader.readInt()
         return@decodeList FindNodeMessage(requestId, distance)
       }
     }
diff --git a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
index 2299bb1..2f7e6ef 100644
--- a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
+++ b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
@@ -94,7 +94,8 @@
   private val selfId: ByteArray,
   k: Int,
   maxReplacements: Int = k,
-  private val nodeId: (T) -> ByteArray
+  private val nodeId: (T) -> ByteArray,
+  private val distanceToSelf: (T) -> Int = { nodeId(it) xorDist selfId }
 ) : Set<T> {
 
   companion object {
@@ -108,8 +109,13 @@
      * @return A new routing table
      */
     @JvmStatic
-    fun <T> create(selfId: ByteArray, k: Int, nodeId: Function<T, ByteArray>): KademliaRoutingTable<T> =
-      KademliaRoutingTable(selfId, k, nodeId = nodeId::apply)
+    fun <T> create(
+      selfId: ByteArray,
+      k: Int,
+      nodeId: Function<T, ByteArray>,
+      distanceToSelf: Function<T, Int>
+    ): KademliaRoutingTable<T> =
+      KademliaRoutingTable(selfId, k, nodeId = nodeId::apply, distanceToSelf = distanceToSelf::apply)
 
     /**
      * Create a new routing table.
@@ -126,8 +132,9 @@
       selfId: ByteArray,
       k: Int,
       maxReplacements: Int,
-      nodeId: Function<T, ByteArray>
-    ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply)
+      nodeId: Function<T, ByteArray>,
+      distanceToSelf: Function<T, Int>
+    ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply, distanceToSelf::apply)
   }
 
   init {
@@ -209,6 +216,10 @@
     buckets.forEach { bucket -> bucket.clear() }
   }
 
+  fun peersOfDistance(value: Int): List<T> {
+    return buckets[value].toList()
+  }
+
   private fun idForNode(node: T): ByteArray {
     val id = nodeId(node)
     require(id.size == selfId.size) { "id obtained for node is not the correct length" }
@@ -218,7 +229,7 @@
 
   private fun bucketFor(node: T) = buckets[logDistToSelf(node)]
 
-  private fun logDistToSelf(node: T): Int = distanceCache.get(node) { idForNode(node) xorDist selfId }
+  private fun logDistToSelf(node: T): Int = distanceCache.get(node) { distanceToSelf.invoke(node) }
 
   private class Bucket<E> private constructor(
     // ordered with most recent first