save
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index c1c5e82..2daf1c2 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -84,7 +84,7 @@
*
* @return node identifier
*/
- fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
+ fun findPendingNodeId(address: InetSocketAddress): Bytes
/**
* Provides node's key pair
@@ -110,4 +110,6 @@
fun getNodesTable(): RoutingTable
fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
+
+ fun findPendingMessage(nodeId: Bytes): UdpMessage?
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
index 289be97..937e0c5 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
@@ -16,10 +16,12 @@
*/
package org.apache.tuweni.devp2p.v5.dht
+import com.google.common.math.IntMath
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.kademlia.KademliaRoutingTable
import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
class RoutingTable(
private val selfEnr: Bytes
@@ -32,12 +34,19 @@
selfId = selfNodeId,
k = BUCKET_SIZE,
nodeId = nodeIdCalculation,
- distanceToSelf = { key(it) xorDist selfNodeId })
+ distanceToSelf = { IntMath.log2(key(it) xorDist selfNodeId, RoundingMode.FLOOR) })
+
+ val size: Int
+ get() = table.size
fun getSelfEnr(): Bytes = selfEnr
fun add(enr: Bytes): Bytes? = table.add(enr)
+ fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+ fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
fun evict(enr: Bytes): Boolean = table.evict(enr)
fun random(): Bytes = table.getRandom()
@@ -53,4 +62,5 @@
companion object {
private const val BUCKET_SIZE: Int = 16
}
+
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 3df99b7..d4ce433 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -40,6 +40,7 @@
import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
import org.apache.tuweni.rlp.RLP
import org.apache.tuweni.rlp.RLPReader
+import java.util.concurrent.ConcurrentHashMap
import kotlin.IllegalArgumentException
class DefaultPacketCodec(
@@ -115,7 +116,7 @@
// Decrypt
val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
- ?: return RandomMessage(encryptedContent)
+ ?: return RandomMessage.create(encryptedContent)
val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index 5c32f20..4942f11 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -49,6 +49,7 @@
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.time.Duration
+import java.util.concurrent.ConcurrentHashMap
import java.util.logging.Logger
import kotlin.coroutines.CoroutineContext
@@ -75,6 +76,10 @@
private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
+ private val pendingMessages: MutableMap<String, UdpMessage> = ConcurrentHashMap()
+
+ private val askedNodes: MutableList<Bytes> = mutableListOf()
+
private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMillis(PING_TIMEOUT))
.removalListener<String, Bytes> {
@@ -83,24 +88,14 @@
private lateinit var refreshJob: Job
private lateinit var receiveJob: Job
+ private lateinit var lookupJob: Job
+
override fun start() {
receiveChannel.bind(bindAddress)
- receiveJob = launch {
- val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
- while (receiveChannel.isOpen) {
- datagram.clear()
- val address = receiveChannel.receive(datagram) as InetSocketAddress
- datagram.flip()
- try {
- processDatagram(datagram, address)
- } catch (ex: Exception) {
- log.warning(ex.message)
- }
- }
- }
-
+ receiveJob = receiveDatagram()
+ lookupJob = lookupNodes()
refreshJob = refreshNodesTable()
}
@@ -112,6 +107,7 @@
) {
launch {
val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
+ pendingMessages[destNodeId.toHexString()] = message
sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
}
}
@@ -120,8 +116,9 @@
receiveChannel.close()
sendChannel.close()
- receiveJob.cancel()
refreshJob.cancel()
+ lookupJob.cancel()
+ receiveJob.cancel()
}
override fun available(): Boolean = receiveChannel.isOpen
@@ -136,9 +133,13 @@
authenticatingPeers[address] = nodeId
}
+ override fun findPendingMessage(nodeId: Bytes): UdpMessage? {
+ return pendingMessages[nodeId.toHexString()]
+ }
+
override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
- override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
+ override fun findPendingNodeId(address: InetSocketAddress): Bytes {
val result = authenticatingPeers[address]
?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
authenticatingPeers.remove(address)
@@ -154,6 +155,43 @@
return result
}
+ private fun lookupNodes() = launch {
+ while (true) {
+ val nearestNodes = getNodesTable().nearest(selfEnr)
+ if (16 > nearestNodes.size) {
+ lookupInternal(nearestNodes)
+ } else {
+ askedNodes.clear()
+ }
+ delay(LOOKUP_REFRESH_RATE)
+ }
+ }
+
+ private fun lookupInternal(nearest: List<Bytes>) {
+ val targetNode = if (nearest.isNotEmpty()) nearest.random() else selfEnr
+ val distance = getNodesTable().distanceToSelf(targetNode)
+ for(target in nearest.take(3)) {
+ val enr = EthereumNodeRecord.fromRLP(target)
+ val message = FindNodeMessage(distance = distance)
+ val address = InetSocketAddress(enr.ip(), enr.udp())
+ send(address, message, Hash.sha2_256(target))
+ }
+ }
+
+ private fun receiveDatagram() = launch {
+ val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+ while (receiveChannel.isOpen) {
+ datagram.clear()
+ val address = receiveChannel.receive(datagram) as InetSocketAddress
+ datagram.flip()
+ try {
+ processDatagram(datagram, address)
+ } catch (ex: Exception) {
+ log.warning(ex.message)
+ }
+ }
+ }
+
private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
val messageBytes = Bytes.wrapByteBuffer(datagram)
val decodeResult = packetCodec.decode(messageBytes)
@@ -167,6 +205,7 @@
is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
+ pendingMessages.remove(decodeResult.srcNodeId.toHexString())
}
private fun refreshNodesTable(): Job = launch {
@@ -181,12 +220,13 @@
send(address, message, nodeId)
pings.put(nodeId.toHexString(), enrBytes)
}
- delay(REFRESH_RATE)
+ delay(TABLE_REFRESH_RATE)
}
}
companion object {
- private const val REFRESH_RATE: Long = 1000
+ private const val LOOKUP_REFRESH_RATE: Long = 3000
+ private const val TABLE_REFRESH_RATE: Long = 1000
private const val PING_TIMEOUT: Long = 20000
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..422a21f 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -36,11 +36,11 @@
connector: UdpConnector
) {
// Retrieve enr
- val destRlp = connector.getPendingNodeIdByAddress(address)
+ val destRlp = connector.findPendingNodeId(address)
val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
val destNodeId = Hash.sha2_256(destRlp)
- val findNodeMessage = FindNodeMessage()
- connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+ val response = connector.findPendingMessage(srcNodeId) ?: FindNodeMessage()
+ connector.send(address, response, destNodeId, handshakeParams)
}
}