find node + nodes message handler
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
index a6dc884..86511e6 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PeerRoutingTable.kt
@@ -21,6 +21,7 @@
import org.apache.tuweni.crypto.Hash.keccak256
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
/**
* A routing table for ÐΞVp2p peers.
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 9770885..ad86da5 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -18,6 +18,7 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.v5.dht.RoutingTable
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import java.net.InetSocketAddress
@@ -97,4 +98,6 @@
* @return node's ENR
*/
fun getEnr(): Bytes
+
+ fun getNodesTable(): RoutingTable
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
new file mode 100644
index 0000000..40cb81e
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
@@ -0,0 +1,35 @@
+package org.apache.tuweni.devp2p.v5.dht
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
+
+class RoutingTable(
+ selfEnr: Bytes
+) {
+
+ private val selfNodeId = key(selfEnr)
+ private val nodeIdCalculation: (Bytes) -> ByteArray = { enr -> key(enr) }
+ private val table = KademliaRoutingTable(
+ selfId = selfNodeId,
+ k = BUCKET_SIZE,
+ nodeId = nodeIdCalculation,
+ distanceToSelf = { key(it) xorDist selfNodeId })
+
+
+ fun add(enr: Bytes): Bytes? = table.add(enr)
+
+ fun nearest(nodeId: Bytes, limit: Int): List<Bytes> = table.nearest(nodeId.toArray(), limit)
+
+ fun evict(enr: Bytes): Boolean = table.evict(enr)
+
+ fun nodesOfDistance(distance: Int): List<Bytes> = table.peersOfDistance(distance)
+
+ private fun key(enr: Bytes): ByteArray = Hash.sha2_256(enr).toArray()
+
+ companion object {
+ private const val BUCKET_SIZE: Int = 16
+ }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index ebb4eb8..fc70c27 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -26,6 +26,9 @@
import org.apache.tuweni.devp2p.v5.MessageHandler
import org.apache.tuweni.devp2p.v5.PacketCodec
import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -33,6 +36,7 @@
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import java.net.InetSocketAddress
import java.nio.ByteBuffer
@@ -54,9 +58,13 @@
private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+ private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
+ private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf()
+ private val nodesTable: RoutingTable = RoutingTable(selfEnr)
+
private lateinit var receiveJob: Job
override fun start() {
@@ -114,6 +122,8 @@
return result
}
+ override fun getNodesTable(): RoutingTable = nodesTable
+
private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
val messageBytes = Bytes.wrapByteBuffer(datagram)
val decodeResult = packetCodec.decode(messageBytes)
@@ -121,7 +131,8 @@
when (message) {
is RandomMessage -> randomMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is WhoAreYouMessage -> whoAreYouMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
- is FindNodeMessage -> { } //TODO: response with NODES message
+ is FindNodeMessage -> findNodeMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
new file mode 100644
index 0000000..47575d0
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
@@ -0,0 +1,27 @@
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class FindNodeMessageHandler: MessageHandler<FindNodeMessage> {
+
+ override fun handle(message: FindNodeMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ val nodes = connector.getNodesTable().nodesOfDistance(message.distance)
+
+ var caret = 0
+ while (caret < nodes.size) {
+ val response = NodesMessage(message.requestId, nodes.size, nodes.subList(caret, caret + MAX_NODES_IN_RESPONSE))
+ connector.send(address, response, srcNodeId)
+ caret += MAX_NODES_IN_RESPONSE
+ }
+ }
+
+ companion object {
+ private const val MAX_NODES_IN_RESPONSE: Int = 16
+ }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
new file mode 100644
index 0000000..00e2fbc
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -0,0 +1,19 @@
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class NodesMessageHandler: MessageHandler<NodesMessage> {
+
+ override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ message.nodeRecords.forEach {
+ EthereumNodeRecord.fromRLP(it)
+ connector.getNodesTable().add(it)
+ }
+ }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
index b39ae46..777d389 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
@@ -21,7 +21,7 @@
class FindNodeMessage(
val requestId: Bytes = UdpMessage.requestId(),
- val distance: Long = 0
+ val distance: Int = 0
) : UdpMessage() {
private val encodedMessageType: Bytes = Bytes.fromHexString("0x03")
@@ -29,7 +29,7 @@
override fun encode(): Bytes {
return RLP.encodeList { writer ->
writer.writeValue(requestId)
- writer.writeLong(distance)
+ writer.writeInt(distance)
}
}
@@ -39,7 +39,7 @@
fun create(content: Bytes): FindNodeMessage {
return RLP.decodeList(content) { reader ->
val requestId = reader.readValue()
- val distance = reader.readLong()
+ val distance = reader.readInt()
return@decodeList FindNodeMessage(requestId, distance)
}
}
diff --git a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
index 2299bb1..2f7e6ef 100644
--- a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
+++ b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
@@ -94,7 +94,8 @@
private val selfId: ByteArray,
k: Int,
maxReplacements: Int = k,
- private val nodeId: (T) -> ByteArray
+ private val nodeId: (T) -> ByteArray,
+ private val distanceToSelf: (T) -> Int = { nodeId(it) xorDist selfId }
) : Set<T> {
companion object {
@@ -108,8 +109,13 @@
* @return A new routing table
*/
@JvmStatic
- fun <T> create(selfId: ByteArray, k: Int, nodeId: Function<T, ByteArray>): KademliaRoutingTable<T> =
- KademliaRoutingTable(selfId, k, nodeId = nodeId::apply)
+ fun <T> create(
+ selfId: ByteArray,
+ k: Int,
+ nodeId: Function<T, ByteArray>,
+ distanceToSelf: Function<T, Int>
+ ): KademliaRoutingTable<T> =
+ KademliaRoutingTable(selfId, k, nodeId = nodeId::apply, distanceToSelf = distanceToSelf::apply)
/**
* Create a new routing table.
@@ -126,8 +132,9 @@
selfId: ByteArray,
k: Int,
maxReplacements: Int,
- nodeId: Function<T, ByteArray>
- ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply)
+ nodeId: Function<T, ByteArray>,
+ distanceToSelf: Function<T, Int>
+ ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply, distanceToSelf::apply)
}
init {
@@ -209,6 +216,10 @@
buckets.forEach { bucket -> bucket.clear() }
}
+ fun peersOfDistance(value: Int): List<T> {
+ return buckets[value].toList()
+ }
+
private fun idForNode(node: T): ByteArray {
val id = nodeId(node)
require(id.size == selfId.size) { "id obtained for node is not the correct length" }
@@ -218,7 +229,7 @@
private fun bucketFor(node: T) = buckets[logDistToSelf(node)]
- private fun logDistToSelf(node: T): Int = distanceCache.get(node) { idForNode(node) xorDist selfId }
+ private fun logDistToSelf(node: T): Int = distanceCache.get(node) { distanceToSelf.invoke(node) }
private class Bucket<E> private constructor(
// ordered with most recent first