Merge pull request #39 from YouJustDontKnow/v5-udp-transport
Discv5 kademlia
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
new file mode 100644
index 0000000..3388444
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.bytes.Bytes
+
+/**
+ * In-memory storage of node records
+ */
+interface ENRStorage {
+
+ /**
+ * Persist node record into storage
+ *
+ * @param enr node record
+ */
+ fun set(enr: Bytes)
+
+ /**
+ * Find node record into storage
+ *
+ * @param nodeId node identifier
+ *
+ * @return node record
+ */
+ fun find(nodeId: Bytes): Bytes?
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt
new file mode 100644
index 0000000..843378b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageObserver.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+/**
+ * Udp message listener for message observance, generally for test purposes
+ */
+interface MessageObserver {
+
+ /**
+ * Perform message observation
+ *
+ * @param incoming processed message
+ */
+ fun observe(message: UdpMessage)
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
index 9a0fcc0..b146209 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
@@ -22,10 +22,12 @@
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
import org.apache.tuweni.io.Base64URLSafe
import java.net.InetSocketAddress
import java.time.Instant
@@ -46,7 +48,6 @@
* Executes service shut down
*/
fun terminate(await: Boolean = false)
-
}
internal class DefaultNodeDiscoveryService(
@@ -63,7 +64,8 @@
null,
bindAddress.port
),
- private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR),
+ private val enrStorage: ENRStorage = DefaultENRStorage(),
+ private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR, enrStorage),
override val coroutineContext: CoroutineContext = Dispatchers.Default
) : NodeDiscoveryService, CoroutineScope {
@@ -91,8 +93,10 @@
val randomMessage = RandomMessage()
val address = InetSocketAddress(enr.ip(), enr.udp())
- connector.addPendingNodeId(address, rlpENR)
- connector.send(address, randomMessage, rlpENR)
+ val destNodeId = Hash.sha2_256(rlpENR)
+ enrStorage.set(rlpENR)
+ connector.getNodesTable().add(rlpENR)
+ connector.send(address, randomMessage, destNodeId)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
index eec5073..23a2cf3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
@@ -19,6 +19,7 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
/**
@@ -43,7 +44,7 @@
*
* @return encoded message
*/
- fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): Bytes
+ fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): EncodeResult
/**
* Decodes message, decrypting it's body
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 9770885..8fef7cf 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -18,8 +18,11 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
import java.net.InetSocketAddress
/**
@@ -68,23 +71,6 @@
fun started(): Boolean
/**
- * Add node identifier which awaits for authentication
- *
- * @param address socket address
- * @param nodeId node identifier
- */
- fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes)
-
- /**
- * Get node identifier which awaits for authentication
- *
- * @param address socket address
- *
- * @return node identifier
- */
- fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
-
- /**
* Provides node's key pair
*
* @return node's key pair
@@ -92,9 +78,62 @@
fun getNodeKeyPair(): SECP256K1.KeyPair
/**
+ * Provides node's ENR in RLP encoded representation
+ *
+ * @return node's RLP encoded ENR
+ */
+ fun getEnrBytes(): Bytes
+
+ /**
* Provides node's ENR
*
* @return node's ENR
*/
- fun getEnr(): Bytes
+ fun getEnr(): EthereumNodeRecord
+
+ /**
+ * Attach observer for listening processed messages
+ *
+ * @param observer instance, proceeding observation
+ */
+ fun attachObserver(observer: MessageObserver)
+
+ /**
+ * Remove observer for listening processed message
+ *
+ * @param observer observer for removal
+ */
+ fun detachObserver(observer: MessageObserver)
+
+ /**
+ * Get kademlia routing table
+ *
+ * @return kademlia table
+ */
+ fun getNodesTable(): RoutingTable
+
+ /**
+ * Retrieve enr of pinging node
+ *
+ * @param node identifier
+ *
+ * @return node record
+ */
+ fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
+
+ /**
+ * Retrieve last sent message, in case if it unauthorized and node can resend with authentication header
+ *
+ * @param authTag message's authentication tag
+ *
+ * @return message, including node identifier
+ */
+ fun getPendingMessage(authTag: Bytes): TrackingMessage
+
+ /**
+ * Provides enr storage of known nodes
+ *
+ * @return nodes storage
+ */
+ fun getNodeRecords(): ENRStorage
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
index f1a22ae..7033ffb 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/AES128GCM.kt
@@ -31,7 +31,6 @@
private const val CIPHER_NAME: String = "AES/GCM/NoPadding"
private const val KEY_SIZE: Int = 128
-
/**
* AES128GCM encryption function
*
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
index cb21279..0858bdd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
@@ -27,7 +27,7 @@
*/
object SessionKeyGenerator {
- private const val DERIVED_KEY_SIZE: Int = 16
+ const val DERIVED_KEY_SIZE: Int = 16
private val INFO_PREFIX = Bytes.wrap("discovery v5 key agreement".toByteArray())
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
index f03b63b..79783c3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
@@ -21,9 +21,9 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.ENR_REQUEST_RETRY_DELAY_MS
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
import org.apache.tuweni.devp2p.v5.misc.AuthHeader
@@ -34,15 +34,16 @@
class DefaultAuthenticationProvider(
private val keyPair: SECP256K1.KeyPair,
- private val enr: Bytes
+ private val routingTable: RoutingTable
) : AuthenticationProvider {
private val sessionKeys: Cache<String, SessionKey> = CacheBuilder
.newBuilder()
- .expireAfterWrite(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(SESSION_KEY_EXPIRATION, TimeUnit.MINUTES)
.build()
- private val nodeId: Bytes = Hash.sha2_256(enr)
+ private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr())
+ @Synchronized
override fun authenticate(handshakeParams: HandshakeInitParameters): AuthHeader {
// Generate ephemeral key pair
val ephemeralKeyPair = SECP256K1.KeyPair.random()
@@ -57,21 +58,30 @@
// Derive keys
val sessionKey = SessionKeyGenerator.generate(nodeId, destNodeId, secret, handshakeParams.idNonce)
- setSessionKey(destNodeId.toHexString(), sessionKey)
+ sessionKeys.put(destNodeId.toHexString(), sessionKey)
val signature = sign(keyPair, handshakeParams)
- return generateAuthHeader(enr, signature, handshakeParams, sessionKey.authRespKey, ephemeralKeyPair.publicKey())
+ return generateAuthHeader(
+ routingTable.getSelfEnr(),
+ signature,
+ handshakeParams,
+ sessionKey.authRespKey,
+ ephemeralKeyPair.publicKey()
+ )
}
+ @Synchronized
override fun findSessionKey(nodeId: String): SessionKey? {
return sessionKeys.getIfPresent(nodeId)
}
+ @Synchronized
override fun setSessionKey(nodeId: String, sessionKey: SessionKey) {
sessionKeys.put(nodeId, sessionKey)
}
+ @Synchronized
override fun finalizeHandshake(senderNodeId: Bytes, authHeader: AuthHeader) {
val ephemeralPublicKey = SECP256K1.PublicKey.fromBytes(authHeader.ephemeralPublicKey)
val secret = SECP256K1.calculateKeyAgreement(keyPair.secretKey(), ephemeralPublicKey)
@@ -89,7 +99,8 @@
if (!signatureVerified) {
throw IllegalArgumentException("Signature is not verified")
}
- setSessionKey(senderNodeId.toHexString(), sessionKey)
+ sessionKeys.put(senderNodeId.toHexString(), sessionKey)
+ routingTable.add(enrRLP)
}
}
@@ -125,6 +136,8 @@
}
companion object {
+ private const val SESSION_KEY_EXPIRATION: Long = 5
+
private const val ZERO_NONCE_SIZE: Int = 12
private const val VERSION: Int = 5
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 6dbab0f..51374c1 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -21,6 +21,7 @@
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.PacketCodec
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
@@ -28,49 +29,63 @@
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.AuthHeader
import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.ReqTicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
import org.apache.tuweni.rlp.RLP
import org.apache.tuweni.rlp.RLPReader
import kotlin.IllegalArgumentException
class DefaultPacketCodec(
private val keyPair: SECP256K1.KeyPair,
- private val enr: Bytes,
- private val nodeId: Bytes = Hash.sha2_256(enr),
- private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, enr)
+ private val routingTable: RoutingTable,
+ private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr()),
+ private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
) : PacketCodec {
- override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): Bytes {
+ override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): EncodeResult {
if (message is WhoAreYouMessage) {
val magic = UdpMessage.magic(nodeId)
val content = message.encode()
- return Bytes.wrap(magic, content)
+ return EncodeResult(magic, Bytes.wrap(magic, content))
}
val tag = UdpMessage.tag(nodeId, destNodeId)
if (message is RandomMessage) {
- val authTag = UdpMessage.authTag()
- val rlpAuthTag = RLP.encodeValue(authTag)
- val content = message.encode()
- return Bytes.wrap(tag, rlpAuthTag, content)
+ return encodeRandomMessage(tag, message)
}
- val authHeader = handshakeParams?.let { authenticationProvider.authenticate(handshakeParams) }
+ val sessionKey = authenticationProvider.findSessionKey(destNodeId.toHexString())
+ val authHeader = handshakeParams?.let {
+ if (null == sessionKey) {
+ authenticationProvider.authenticate(handshakeParams)
+ } else null
+ }
val initiatorKey = authenticationProvider.findSessionKey(destNodeId.toHexString())?.initiatorKey
- ?: throw IllegalArgumentException() // TODO handle
+ ?: return encodeRandomMessage(tag, RandomMessage())
val messagePlain = Bytes.wrap(message.getMessageType(), message.encode())
return if (null != authHeader) {
val encodedHeader = authHeader.asRlp()
val authTag = authHeader.authTag
val encryptionMeta = Bytes.wrap(tag, encodedHeader)
val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, encryptionMeta)
- Bytes.wrap(tag, encodedHeader, encryptionResult)
+ if (message is NodesMessage) {
+ println(encryptionResult)
+ }
+ EncodeResult(authTag, Bytes.wrap(tag, encodedHeader, encryptionResult))
} else {
val authTag = UdpMessage.authTag()
val authTagHeader = RLP.encodeValue(authTag)
val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, tag)
- Bytes.wrap(tag, authTagHeader, encryptionResult)
+ EncodeResult(authTag, Bytes.wrap(tag, authTagHeader, encryptionResult))
}
}
@@ -85,6 +100,7 @@
private fun read(tag: Bytes, senderNodeId: Bytes, contentWithHeader: Bytes, reader: RLPReader): UdpMessage {
// Distinguish auth header or auth tag
var authHeader: AuthHeader? = null
+ var authTag: Bytes = Bytes.EMPTY
if (reader.nextIsList()) {
if (WHO_ARE_YOU_MESSAGE_LENGTH == contentWithHeader.size()) {
return WhoAreYouMessage.create(contentWithHeader)
@@ -99,14 +115,14 @@
}
authenticationProvider.finalizeHandshake(senderNodeId, authHeader)
} else {
- reader.readValue()
+ authTag = reader.readValue()
}
val encryptedContent = contentWithHeader.slice(reader.position())
// Decrypt
val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
- ?: return RandomMessage(encryptedContent)
+ ?: return RandomMessage.create(authTag, encryptedContent)
val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
@@ -114,11 +130,25 @@
// Retrieve result
return when (messageType.toInt()) {
+ 1 -> PingMessage.create(message)
+ 2 -> PongMessage.create(message)
3 -> FindNodeMessage.create(message)
+ 4 -> NodesMessage.create(message)
+ 5 -> ReqTicketMessage.create(message)
+ 6 -> TicketMessage.create(message)
+ 7 -> RegTopicMessage.create(message)
+ 8 -> RegConfirmationMessage.create(message)
+ 9 -> TopicQueryMessage.create(message)
else -> throw IllegalArgumentException("Unknown message retrieved")
}
}
+ private fun encodeRandomMessage(tag: Bytes, message: RandomMessage): EncodeResult {
+ val rlpAuthTag = RLP.encodeValue(message.authTag)
+ val content = message.encode()
+ return EncodeResult(message.authTag, Bytes.wrap(tag, rlpAuthTag, content))
+ }
+
companion object {
private const val WHO_ARE_YOU_MESSAGE_LENGTH = 48
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index ebb4eb8..9a517a0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -16,16 +16,28 @@
*/
package org.apache.tuweni.devp2p.v5.internal
+import com.google.common.cache.Cache
+import com.google.common.cache.CacheBuilder
+import com.google.common.cache.RemovalCause
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.MessageObserver
import org.apache.tuweni.devp2p.v5.PacketCodec
import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.PongMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -33,9 +45,15 @@
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import java.net.InetSocketAddress
import java.nio.ByteBuffer
+import java.time.Duration
import java.util.logging.Logger
import kotlin.coroutines.CoroutineContext
@@ -43,38 +61,64 @@
private val bindAddress: InetSocketAddress,
private val keyPair: SECP256K1.KeyPair,
private val selfEnr: Bytes,
- private val nodeId: Bytes = Hash.sha2_256(selfEnr),
+ private val enrStorage: ENRStorage = DefaultENRStorage(),
private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
- private val sendChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
- private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, selfEnr),
+ private val nodesTable: RoutingTable = RoutingTable(selfEnr),
+ private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable),
+ private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
+ private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
override val coroutineContext: CoroutineContext = Dispatchers.IO
) : UdpConnector, CoroutineScope {
- private val log: Logger = Logger.getLogger(this.javaClass.simpleName)
+ private val log: Logger = Logger.getLogger(DefaultUdpConnector::class.java.simpleName)
private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
- private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+ private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler()
+ private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
+ private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
+ private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
+ private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
- private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf()
+ private val askedNodes: MutableList<Bytes> = mutableListOf()
+ private val pendingMessages: Cache<String, TrackingMessage> = CacheBuilder.newBuilder()
+ .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
+ .build()
+ private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
+ .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT + PING_TIMEOUT))
+ .removalListener<String, Bytes> {
+ if (RemovalCause.EXPIRED == it.cause) {
+ getNodesTable().evict(it.value)
+ }
+ }.build()
+
+ private lateinit var refreshJob: Job
private lateinit var receiveJob: Job
+ private lateinit var lookupJob: Job
+
+ override fun available(): Boolean = receiveChannel.isOpen
+
+ override fun started(): Boolean = ::receiveJob.isInitialized && available()
+
+ override fun getEnrBytes(): Bytes = selfEnr
+
+ override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+
+ override fun getNodeRecords(): ENRStorage = enrStorage
+
+ override fun getNodesTable(): RoutingTable = nodesTable
+
+ override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+
+ override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
+ ?: throw IllegalArgumentException("Pending message not found")
override fun start() {
receiveChannel.bind(bindAddress)
- receiveJob = launch {
- val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
- while (receiveChannel.isOpen) {
- datagram.clear()
- val address = receiveChannel.receive(datagram) as InetSocketAddress
- datagram.flip()
- try {
- processDatagram(datagram, address)
- } catch (ex: Exception) {
- log.warning(ex.message)
- }
- }
- }
+ receiveJob = receiveDatagram()
+ lookupJob = lookupNodes()
+ refreshJob = refreshNodesTable()
}
override fun send(
@@ -84,45 +128,122 @@
handshakeParams: HandshakeInitParameters?
) {
launch {
- val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
- sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
+ val encodeResult = packetCodec.encode(message, destNodeId, handshakeParams)
+ pendingMessages.put(encodeResult.authTag.toHexString(), TrackingMessage(message, destNodeId))
+ receiveChannel.send(ByteBuffer.wrap(encodeResult.content.toArray()), address)
}
}
override fun terminate() {
- receiveJob.cancel()
receiveChannel.close()
- sendChannel.close()
+
+ refreshJob.cancel()
+ lookupJob.cancel()
+ receiveJob.cancel()
}
- override fun available(): Boolean = receiveChannel.isOpen
-
- override fun started(): Boolean = ::receiveJob.isInitialized && available()
-
- override fun getEnr(): Bytes = selfEnr
-
- override fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes) {
- authenticatingPeers[address] = nodeId
+ override fun attachObserver(observer: MessageObserver) {
+ messageListeners.add(observer)
}
- override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+ override fun detachObserver(observer: MessageObserver) {
+ messageListeners.remove(observer)
+ }
- override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
- val result = authenticatingPeers[address]
- ?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
- authenticatingPeers.remove(address)
+ override fun getAwaitingPongRecord(nodeId: Bytes): Bytes? {
+ val nodeIdHex = nodeId.toHexString()
+ val result = pings.getIfPresent(nodeIdHex)
+ pings.invalidate(nodeIdHex)
return result
}
+ // Lookup nodes
+ private fun lookupNodes() = launch {
+ while (true) {
+ val nearestNodes = getNodesTable().nearest(selfEnr)
+ if (REQUIRED_LOOKUP_NODES > nearestNodes.size) {
+ lookupInternal(nearestNodes)
+ } else {
+ askedNodes.clear()
+ }
+ delay(LOOKUP_REFRESH_RATE)
+ }
+ }
+
+ private fun lookupInternal(nearest: List<Bytes>) {
+ val nonAskedNodes = nearest - askedNodes
+ val targetNode = if (nonAskedNodes.isNotEmpty()) nonAskedNodes.random() else Bytes.random(32)
+ val distance = getNodesTable().distanceToSelf(targetNode)
+ for (target in nearest.take(LOOKUP_MAX_REQUESTED_NODES)) {
+ val enr = EthereumNodeRecord.fromRLP(target)
+ val message = FindNodeMessage(distance = distance)
+ val address = InetSocketAddress(enr.ip(), enr.udp())
+ send(address, message, Hash.sha2_256(target))
+ askedNodes.add(target)
+ }
+ }
+
+ // Process packets
+ private fun receiveDatagram() = launch {
+ while (receiveChannel.isOpen) {
+ val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+ val address = receiveChannel.receive(datagram) as InetSocketAddress
+ datagram.flip()
+ launch {
+ try {
+ processDatagram(datagram, address)
+ } catch (ex: Exception) {
+ log.warning("${ex.message}")
+ }
+ }
+ }
+ }
+
private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
+ if (datagram.limit() > UdpMessage.MAX_UDP_MESSAGE_SIZE) {
+ return
+ }
val messageBytes = Bytes.wrapByteBuffer(datagram)
val decodeResult = packetCodec.decode(messageBytes)
val message = decodeResult.message
when (message) {
is RandomMessage -> randomMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is WhoAreYouMessage -> whoAreYouMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
- is FindNodeMessage -> { } //TODO: response with NODES message
+ is FindNodeMessage -> findNodeMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is PingMessage -> pingMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
+ messageListeners.forEach { it.observe(message) }
+ }
+
+ // Ping nodes
+ private fun refreshNodesTable(): Job = launch {
+ while (true) {
+ if (!getNodesTable().isEmpty()) {
+ val enrBytes = getNodesTable().random()
+ val nodeId = Hash.sha2_256(enrBytes)
+ if (null == pings.getIfPresent(nodeId.toHexString())) {
+ val enr = EthereumNodeRecord.fromRLP(enrBytes)
+ val address = InetSocketAddress(enr.ip(), enr.udp())
+ val message = PingMessage(enrSeq = enr.seq)
+
+ send(address, message, nodeId)
+ pings.put(nodeId.toHexString(), enrBytes)
+ }
+ }
+ delay(TABLE_REFRESH_RATE)
+ }
+ }
+
+ companion object {
+ private const val REQUIRED_LOOKUP_NODES: Int = 16
+ private const val LOOKUP_MAX_REQUESTED_NODES: Int = 3
+
+ private const val LOOKUP_REFRESH_RATE: Long = 3000
+ private const val TABLE_REFRESH_RATE: Long = 1000
+ private const val REQUEST_TIMEOUT: Long = 1000
+ private const val PING_TIMEOUT: Long = 500
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
new file mode 100644
index 0000000..4e7c354
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/FindNodeMessageHandler.kt
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class FindNodeMessageHandler : MessageHandler<FindNodeMessage> {
+
+ override fun handle(message: FindNodeMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ if (0 == message.distance) {
+ val response = NodesMessage(message.requestId, 1, listOf(connector.getEnrBytes()))
+ connector.send(address, response, srcNodeId)
+ return
+ }
+
+ val nodes = connector.getNodesTable().nodesOfDistance(message.distance)
+
+ nodes.chunked(MAX_NODES_IN_RESPONSE).forEach {
+ val response = NodesMessage(message.requestId, nodes.size, it)
+ connector.send(address, response, srcNodeId)
+ }
+ }
+
+ companion object {
+ private const val MAX_NODES_IN_RESPONSE: Int = 4
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
new file mode 100644
index 0000000..d8b33dd
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import java.net.InetSocketAddress
+
+class NodesMessageHandler : MessageHandler<NodesMessage> {
+
+ override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ message.nodeRecords.forEach {
+ EthereumNodeRecord.fromRLP(it)
+ connector.getNodeRecords().set(it)
+ connector.getNodesTable().add(it)
+ }
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt
new file mode 100644
index 0000000..1abaf82
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PingMessageHandler.kt
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import java.net.InetSocketAddress
+
+class PingMessageHandler : MessageHandler<PingMessage> {
+
+ override fun handle(message: PingMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ val response = PongMessage(message.requestId, connector.getEnr().seq, address.address, address.port)
+ connector.send(address, response, srcNodeId)
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt
new file mode 100644
index 0000000..fc8c973
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/PongMessageHandler.kt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import java.net.InetSocketAddress
+
+class PongMessageHandler : MessageHandler<PongMessage> {
+
+ override fun handle(message: PongMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ val enrBytes = connector.getAwaitingPongRecord(srcNodeId) ?: return
+ val enr = EthereumNodeRecord.fromRLP(enrBytes)
+ if (enr.seq != message.enrSeq) {
+ val request = FindNodeMessage(message.requestId)
+ connector.send(address, request, srcNodeId)
+ }
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
index 477710a..60f965d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
@@ -26,8 +26,7 @@
class RandomMessageHandler : MessageHandler<RandomMessage> {
override fun handle(message: RandomMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
- connector.addPendingNodeId(address, srcNodeId)
- val response = WhoAreYouMessage()
+ val response = WhoAreYouMessage(message.authTag)
connector.send(address, response, srcNodeId)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..62025f3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -17,17 +17,16 @@
package org.apache.tuweni.devp2p.v5.internal.handler
import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.devp2p.v5.MessageHandler
import org.apache.tuweni.devp2p.v5.UdpConnector
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import java.lang.IllegalArgumentException
import java.net.InetSocketAddress
-class WhoAreYouMessageHandler(
- private val nodeId: Bytes
-) : MessageHandler<WhoAreYouMessage> {
+class WhoAreYouMessageHandler : MessageHandler<WhoAreYouMessage> {
override fun handle(
message: WhoAreYouMessage,
@@ -36,11 +35,12 @@
connector: UdpConnector
) {
// Retrieve enr
- val destRlp = connector.getPendingNodeIdByAddress(address)
- val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
- val destNodeId = Hash.sha2_256(destRlp)
+ val trackingMessage = connector.getPendingMessage(message.authTag)
+ val rlpEnr = connector.getNodeRecords().find(trackingMessage.nodeId)
+ ?: throw IllegalArgumentException("Unable to find node enr by id ${trackingMessage.nodeId}")
+ val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, rlpEnr)
- val findNodeMessage = FindNodeMessage()
- connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+ val response = if (trackingMessage.message is RandomMessage) FindNodeMessage() else trackingMessage.message
+ connector.send(address, response, trackingMessage.nodeId, handshakeParams)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
new file mode 100644
index 0000000..5a482ea
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+
+class EncodeResult(
+ val authTag: Bytes,
+ val content: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
new file mode 100644
index 0000000..c0d724b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+class TrackingMessage(
+ val message: UdpMessage,
+ val nodeId: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
index b39ae46..777d389 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessage.kt
@@ -21,7 +21,7 @@
class FindNodeMessage(
val requestId: Bytes = UdpMessage.requestId(),
- val distance: Long = 0
+ val distance: Int = 0
) : UdpMessage() {
private val encodedMessageType: Bytes = Bytes.fromHexString("0x03")
@@ -29,7 +29,7 @@
override fun encode(): Bytes {
return RLP.encodeList { writer ->
writer.writeValue(requestId)
- writer.writeLong(distance)
+ writer.writeInt(distance)
}
}
@@ -39,7 +39,7 @@
fun create(content: Bytes): FindNodeMessage {
return RLP.decodeList(content) { reader ->
val requestId = reader.readValue()
- val distance = reader.readLong()
+ val distance = reader.readInt()
return@decodeList FindNodeMessage(requestId, distance)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
index e43ded7..faaf210 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
@@ -19,6 +19,7 @@
import org.apache.tuweni.bytes.Bytes
class RandomMessage(
+ val authTag: Bytes = authTag(),
val data: Bytes = randomData()
) : UdpMessage() {
@@ -27,8 +28,8 @@
}
companion object {
- fun create(content: Bytes): RandomMessage {
- return RandomMessage(content)
+ fun create(authTag: Bytes, content: Bytes = randomData()): RandomMessage {
+ return RandomMessage(authTag, content)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
new file mode 100644
index 0000000..26ff813
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import java.util.concurrent.ConcurrentHashMap
+
+class DefaultENRStorage : ENRStorage {
+
+ private val storage: MutableMap<String, Bytes> = ConcurrentHashMap()
+
+ override fun find(nodeId: Bytes): Bytes? = storage[nodeId.toHexString()]
+
+ override fun set(enr: Bytes) {
+ val nodeId = Hash.sha2_256(enr)
+ storage[nodeId.toHexString()] = enr
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
new file mode 100644
index 0000000..6777272
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import com.google.common.math.IntMath
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.kademlia.KademliaRoutingTable
+import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
+
+class RoutingTable(
+ private val selfEnr: Bytes
+) {
+
+ private val selfNodeId = key(selfEnr)
+ private val nodeIdCalculation: (Bytes) -> ByteArray = { enr -> key(enr) }
+
+ private val table = KademliaRoutingTable(
+ selfId = selfNodeId,
+ k = BUCKET_SIZE,
+ nodeId = nodeIdCalculation,
+ distanceToSelf = {
+ val xorResult = key(it) xorDist selfNodeId
+ IntMath.log2(xorResult, RoundingMode.FLOOR)
+ })
+
+ val size: Int
+ get() = table.size
+
+ fun getSelfEnr(): Bytes = selfEnr
+
+ fun add(enr: Bytes) {
+ if (enr != selfEnr) {
+ table.add(enr)
+ }
+ }
+
+ fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+ fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
+ fun evict(enr: Bytes): Boolean = table.evict(enr)
+
+ fun random(): Bytes = table.getRandom()
+
+ fun isEmpty(): Boolean = table.isEmpty()
+
+ fun nodesOfDistance(distance: Int): List<Bytes> = table.peersOfDistance(distance)
+
+ fun clear() = table.clear()
+
+ private fun key(enr: Bytes): ByteArray = Hash.sha2_256(enr).toArray()
+
+ companion object {
+ private const val BUCKET_SIZE: Int = 16
+ }
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
new file mode 100644
index 0000000..8344c3a
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
+import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+import java.net.InetSocketAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+abstract class AbstractIntegrationTest {
+
+ protected fun createNode(
+ port: Int = 9090,
+ bootList: List<String> = emptyList(),
+ enrStorage: ENRStorage = DefaultENRStorage(),
+ keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random(),
+ enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = port),
+ routingTable: RoutingTable = RoutingTable(enr),
+ address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), port),
+ authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable),
+ packetCodec: PacketCodec = DefaultPacketCodec(
+ keyPair,
+ routingTable,
+ authenticationProvider = authenticationProvider
+ ),
+ connector: UdpConnector = DefaultUdpConnector(
+ address,
+ keyPair,
+ enr,
+ enrStorage,
+ nodesTable = routingTable,
+ packetCodec = packetCodec
+ ),
+ service: NodeDiscoveryService =
+ DefaultNodeDiscoveryService(
+ keyPair,
+ port,
+ enrStorage = enrStorage,
+ bootstrapENRList = bootList,
+ connector = connector
+ )
+ ): TestNode {
+ service.start()
+ return TestNode(
+ bootList,
+ port,
+ enrStorage,
+ keyPair,
+ enr,
+ address,
+ routingTable,
+ authenticationProvider,
+ packetCodec,
+ connector,
+ service
+ )
+ }
+
+ protected fun handshake(initiator: TestNode, recipient: TestNode): Boolean {
+ initiator.enrStorage.set(recipient.enr)
+ initiator.routingTable.add(recipient.enr)
+ val message = RandomMessage()
+ initiator.connector.send(recipient.address, message, recipient.nodeId)
+ while (true) {
+ if (null != recipient.authenticationProvider.findSessionKey(initiator.nodeId.toHexString())) {
+ return true
+ }
+ }
+ }
+
+ protected fun send(initiator: TestNode, recipient: TestNode, message: UdpMessage) {
+ if (message is RandomMessage || message is WhoAreYouMessage) {
+ throw IllegalArgumentException("Can't send handshake initiation message")
+ }
+ initiator.connector.send(recipient.address, message, recipient.nodeId)
+ }
+
+ protected inline fun <reified T : UdpMessage> sendAndAwait(
+ initiator: TestNode,
+ recipient: TestNode,
+ message: UdpMessage
+ ): T {
+ val listener = object : MessageObserver {
+ var result: Channel<T> = Channel()
+
+ override fun observe(message: UdpMessage) {
+ if (message is T) {
+ result.offer(message)
+ }
+ }
+ }
+
+ return runBlocking {
+ initiator.connector.attachObserver(listener)
+ send(initiator, recipient, message)
+ val result = listener.result.receive()
+ initiator.connector.detachObserver(listener)
+ result
+ }
+ }
+}
+
+class TestNode(
+ val bootList: List<String>,
+ val port: Int,
+ val enrStorage: ENRStorage,
+ val keyPair: SECP256K1.KeyPair,
+ val enr: Bytes,
+ val address: InetSocketAddress,
+ val routingTable: RoutingTable,
+ val authenticationProvider: AuthenticationProvider,
+ val packetCodec: PacketCodec,
+ val connector: UdpConnector,
+ val service: NodeDiscoveryService,
+ val nodeId: Bytes = Hash.sha2_256(enr)
+)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
index a9f64e5..2b7d8a7 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
@@ -64,7 +64,7 @@
bootstrapENRList,
enrSeq,
selfENR,
- connector
+ connector = connector
)
@Test
@@ -81,7 +81,7 @@
val receivedBytes = Bytes.wrapByteBuffer(buffer)
val content = receivedBytes.slice(45)
- val message = RandomMessage.create(content)
+ val message = RandomMessage.create(UdpMessage.authTag(), content)
assert(message.data.size() == UdpMessage.RANDOM_DATA_LENGTH)
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
deleted file mode 100644
index 46b0d47..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5
-
-import kotlinx.coroutines.runBlocking
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
-import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
-import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
-import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
-import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
-import org.apache.tuweni.devp2p.v5.packet.RandomMessage
-import org.apache.tuweni.devp2p.v5.packet.UdpMessage
-import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
-import org.apache.tuweni.io.Base64URLSafe
-import org.apache.tuweni.junit.BouncyCastleExtension
-import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.extension.ExtendWith
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-@ExtendWith(BouncyCastleExtension::class)
-class HandshakeIntegrationTest {
-
- private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
- private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = SERVICE_PORT)
- private val address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), SERVICE_PORT)
- private val connector: UdpConnector = DefaultUdpConnector(address, keyPair, enr)
-
- private val clientKeyPair = SECP256K1.KeyPair.random()
- private val clientEnr = EthereumNodeRecord.toRLP(clientKeyPair, ip = InetAddress.getLocalHost(), udp = CLIENT_PORT)
- private val authProvider = DefaultAuthenticationProvider(clientKeyPair, clientEnr)
- private val clientCodec = DefaultPacketCodec(clientKeyPair, clientEnr, authenticationProvider = authProvider)
- private val socket = CoroutineDatagramChannel.open()
-
- private val clientNodeId: Bytes = Hash.sha2_256(clientEnr)
-
- private val bootList = listOf("enr:${Base64URLSafe.encode(clientEnr)}")
- private val service: NodeDiscoveryService =
- DefaultNodeDiscoveryService(keyPair, SERVICE_PORT, bootstrapENRList = bootList, connector = connector)
-
- @BeforeEach
- fun init() {
- socket.bind(InetSocketAddress(9091))
- service.start()
- }
-
- @Test
- fun discv5HandshakeTest() {
- runBlocking {
- val buffer = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
- socket.receive(buffer)
- buffer.flip()
-
- var content = Bytes.wrapByteBuffer(buffer)
- var decodingResult = clientCodec.decode(content)
- assert(decodingResult.message is RandomMessage)
- buffer.clear()
-
- sendWhoAreYou()
-
- socket.receive(buffer)
- buffer.flip()
- content = Bytes.wrapByteBuffer(buffer)
- decodingResult = clientCodec.decode(content)
- assert(decodingResult.message is FindNodeMessage)
- assert(null != authProvider.findSessionKey(Hash.sha2_256(enr).toHexString()))
-
- val message = decodingResult.message as FindNodeMessage
-
- assert(message.distance == 0L)
- assert(message.requestId.size() == UdpMessage.REQUEST_ID_LENGTH)
- }
- }
-
- @AfterEach
- fun tearDown() {
- service.terminate(true)
- socket.close()
- }
-
- private suspend fun sendWhoAreYou() {
- val message = WhoAreYouMessage()
- val bytes = clientCodec.encode(message, clientNodeId)
- val buffer = ByteBuffer.wrap(bytes.toArray())
- socket.send(buffer, address)
- }
-
- companion object {
- private const val SERVICE_PORT: Int = 9090
- private const val CLIENT_PORT: Int = 9091
- }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
new file mode 100644
index 0000000..47c82c4
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.junit.jupiter.api.Disabled
+import org.junit.jupiter.api.Test
+
+class IntegrationTest : AbstractIntegrationTest() {
+
+ @Test
+ fun testHandshake() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ val result = handshake(node1, node2)
+
+ assert(result)
+
+ node1.service.terminate(true)
+ node2.service.terminate(true)
+ }
+
+ @Test
+ fun testPing() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ handshake(node1, node2)
+ val pong = sendAndAwait<PongMessage>(node1, node2, PingMessage())
+
+ assert(node1.port == pong.recipientPort)
+
+ node1.service.terminate(true)
+ node2.service.terminate(true)
+ }
+
+ @Test
+ fun testTableMaintenance() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ handshake(node1, node2)
+ runBlocking {
+ assert(!node1.routingTable.isEmpty())
+
+ node2.service.terminate( true)
+
+ delay(5000)
+
+ assert(node1.routingTable.isEmpty())
+
+ node1.service.terminate(true)
+ }
+ }
+
+ @Test
+ @Disabled
+ fun testNetworkLookup() {
+ val targetNode = createNode(9090)
+
+ val node1 = createNode(9091)
+ val node2 = createNode(9092)
+ val node3 = createNode(9093)
+ val node4 = createNode(9094)
+ val node5 = createNode(9095)
+ val node6 = createNode(9096)
+ val node7 = createNode(9097)
+ val node8 = createNode(9098)
+ val node9 = createNode(9099)
+ val node10 = createNode(9100)
+ val node11 = createNode(9101)
+ val node12 = createNode(9102)
+ val node13 = createNode(9103)
+ val node14 = createNode(9104)
+ val node15 = createNode(9105)
+ val node16 = createNode(9106)
+ val node17 = createNode(9107)
+
+ handshake(node1, node2)
+ handshake(node2, node3)
+ handshake(node3, node4)
+ handshake(node4, node5)
+ handshake(node5, node6)
+ handshake(node6, node7)
+ handshake(node7, node8)
+ handshake(node9, node10)
+ handshake(node10, node11)
+ handshake(node11, node12)
+ handshake(node12, node13)
+ handshake(node13, node14)
+ handshake(node14, node15)
+ handshake(node15, node16)
+ handshake(node16, node17)
+
+ handshake(targetNode, node1)
+ handshake(targetNode, node4)
+ handshake(targetNode, node7)
+
+ var size = targetNode.routingTable.size
+ while (size < 8) {
+ val newSize = targetNode.routingTable.size
+ if (size < newSize) {
+ size = newSize
+ println(size)
+ }
+ }
+
+ node1.service.terminate(true)
+ node2.service.terminate(true)
+ node3.service.terminate(true)
+ node4.service.terminate(true)
+ node5.service.terminate(true)
+ node6.service.terminate(true)
+ node7.service.terminate(true)
+ node8.service.terminate(true)
+ node9.service.terminate(true)
+ node10.service.terminate(true)
+ node11.service.terminate(true)
+ node12.service.terminate(true)
+ node13.service.terminate(true)
+ node14.service.terminate(true)
+ node15.service.terminate(true)
+ node16.service.terminate(true)
+ node17.service.terminate(true)
+
+ targetNode.service.terminate(true)
+ }
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
index cebe167..4b6d188 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
@@ -20,8 +20,8 @@
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
-import org.apache.tuweni.devp2p.v5.misc.SessionKey
import org.apache.tuweni.junit.BouncyCastleExtension
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@@ -32,7 +32,8 @@
private val providerKeyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
private val providerEnr: Bytes = EthereumNodeRecord.toRLP(providerKeyPair, ip = InetAddress.getLocalHost())
- private val authenticationProvider = DefaultAuthenticationProvider(providerKeyPair, providerEnr)
+ private val routingTable: RoutingTable = RoutingTable(providerEnr)
+ private val authenticationProvider = DefaultAuthenticationProvider(providerKeyPair, routingTable)
@Test
fun authenticateReturnsValidAuthHeader() {
@@ -60,10 +61,11 @@
val nonce = Bytes.fromHexString("0x012715E4EFA2464F51BE49BBC40836E5816B3552249F8AC00AD1BBDB559E44E9")
val authTag = Bytes.fromHexString("0x39BBC27C8CFA3735DF436AC6")
val destEnr = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+ val clientRoutingTable = RoutingTable(destEnr)
val params = HandshakeInitParameters(nonce, authTag, providerEnr)
val destNodeId = Hash.sha2_256(destEnr)
- val clientAuthProvider = DefaultAuthenticationProvider(keyPair, destEnr)
+ val clientAuthProvider = DefaultAuthenticationProvider(keyPair, clientRoutingTable)
val authHeader = clientAuthProvider.authenticate(params)
@@ -78,17 +80,4 @@
assert(result == null)
}
-
- @Test
- fun setSessionKeyPersistsSessionKeyIfExists() {
- val nodeId = Bytes.random(32).toHexString()
- val bytes = Bytes.random(32)
- val sessionKey = SessionKey(bytes, bytes, bytes)
-
- authenticationProvider.setSessionKey(nodeId, sessionKey)
-
- val result = authenticationProvider.findSessionKey(nodeId)
-
- assert(result != null)
- }
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
index 85a08dd..e04771f 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
@@ -22,6 +22,7 @@
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.PacketCodec
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
import org.apache.tuweni.devp2p.v5.misc.SessionKey
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -30,7 +31,6 @@
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.junit.BouncyCastleExtension
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetAddress
@@ -40,20 +40,21 @@
private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
private val nodeId: Bytes = Hash.sha2_256(enr)
- private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, enr)
+ private val routingTable: RoutingTable = RoutingTable(enr)
+ private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
- private val codec: PacketCodec = DefaultPacketCodec(keyPair, enr, nodeId, authenticationProvider)
+ private val codec: PacketCodec = DefaultPacketCodec(keyPair, routingTable, nodeId, authenticationProvider)
private val destNodeId: Bytes = Bytes.random(32)
@Test
- fun encodePerformsValidEncodingOfRandomMessasge() {
+ fun encodePerformsValidEncodingOfRandomMessage() {
val message = RandomMessage()
val encodedResult = codec.encode(message, destNodeId)
- val encodedContent = encodedResult.slice(45)
- val result = RandomMessage.create(encodedContent)
+ val encodedContent = encodedResult.content.slice(45)
+ val result = RandomMessage.create(UdpMessage.authTag(), encodedContent)
assert(result.data == message.data)
}
@@ -64,7 +65,7 @@
val encodedResult = codec.encode(message, destNodeId)
- val encodedContent = encodedResult.slice(32)
+ val encodedContent = encodedResult.content.slice(32)
val result = WhoAreYouMessage.create(encodedContent)
assert(result.idNonce == message.idNonce)
@@ -83,8 +84,8 @@
val encodedResult = codec.encode(message, destNodeId)
- val tag = encodedResult.slice(0, UdpMessage.TAG_LENGTH)
- val encryptedContent = encodedResult.slice(45)
+ val tag = encodedResult.content.slice(0, UdpMessage.TAG_LENGTH)
+ val encryptedContent = encodedResult.content.slice(45)
val content = AES128GCM.decrypt(encryptedContent, sessionKey.initiatorKey, tag).slice(1)
val result = FindNodeMessage.create(content)
@@ -93,21 +94,12 @@
}
@Test
- fun encodeFailsIfSessionKeyIsNotExists() {
- val message = FindNodeMessage()
-
- assertThrows<IllegalArgumentException> {
- codec.encode(message, destNodeId)
- }
- }
-
- @Test
fun decodePerformsValidDecodingOfRandomMessasge() {
val message = RandomMessage()
val encodedResult = codec.encode(message, destNodeId)
- val result = codec.decode(encodedResult).message as? RandomMessage
+ val result = codec.decode(encodedResult.content).message as? RandomMessage
assert(null != result)
assert(result!!.data == message.data)
@@ -119,7 +111,7 @@
val encodedResult = codec.encode(message, destNodeId)
- val result = codec.decode(encodedResult).message as? WhoAreYouMessage
+ val result = codec.decode(encodedResult.content).message as? WhoAreYouMessage
assert(null != result)
assert(result!!.idNonce == message.idNonce)
@@ -139,7 +131,7 @@
val encodedResult = codec.encode(message, nodeId)
- val result = codec.decode(encodedResult).message as? FindNodeMessage
+ val result = codec.decode(encodedResult.content).message as? FindNodeMessage
assert(result!!.requestId == message.requestId)
assert(result.distance == message.distance)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
index a345208..076deb4 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
@@ -16,13 +16,18 @@
*/
package org.apache.tuweni.devp2p.v5.internal
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.MessageObserver
import org.apache.tuweni.devp2p.v5.UdpConnector
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.junit.BouncyCastleExtension
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import org.junit.jupiter.api.AfterEach
@@ -37,18 +42,17 @@
class DefaultUdpConnectorTest {
private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
- private val nodeId: Bytes = Bytes.fromHexString("0x98EB6D611291FA21F6169BFF382B9369C33D997FE4DC93410987E27796360640")
private val address: InetSocketAddress = InetSocketAddress(9090)
private val selfEnr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = address.address)
private val data: Bytes = UdpMessage.randomData()
- private val message: RandomMessage = RandomMessage(data)
+ private val message: RandomMessage = RandomMessage(UdpMessage.authTag(), data)
- private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+ private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr)
@BeforeEach
fun setUp() {
- connector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+ connector = DefaultUdpConnector(address, keyPair, selfEnr)
}
@AfterEach
@@ -93,10 +97,66 @@
buffer.flip()
val messageContent = Bytes.wrapByteBuffer(buffer).slice(45)
- val message = RandomMessage.create(messageContent)
+ val message = RandomMessage.create(UdpMessage.authTag(), messageContent)
assert(message.data == data)
}
socketChannel.close()
}
+
+ @Test
+ @UseExperimental(ExperimentalCoroutinesApi::class)
+ fun attachObserverRegistersListener() {
+ val observer = object : MessageObserver {
+ var result: Channel<RandomMessage> = Channel()
+ override fun observe(message: UdpMessage) {
+ if (message is RandomMessage) {
+ result.offer(message)
+ }
+ }
+ }
+ connector.attachObserver(observer)
+ connector.start()
+
+ assert(observer.result.isEmpty)
+
+ val codec = DefaultPacketCodec(SECP256K1.KeyPair.random(), RoutingTable(Bytes.random(32)))
+ val socketChannel = CoroutineDatagramChannel.open()
+
+ runBlocking {
+ val message = RandomMessage()
+ val encodedRandomMessage = codec.encode(message, Hash.sha2_256(connector.getEnrBytes()))
+ val buffer = ByteBuffer.wrap(encodedRandomMessage.content.toArray())
+ socketChannel.send(buffer, InetSocketAddress(InetAddress.getLocalHost(), 9090))
+ val expectedResult = observer.result.receive()
+ assert(expectedResult.data == message.data)
+ }
+ }
+
+ @Test
+ @UseExperimental(ExperimentalCoroutinesApi::class)
+ fun detachObserverRemovesListener() {
+ val observer = object : MessageObserver {
+ var result: Channel<RandomMessage> = Channel()
+ override fun observe(message: UdpMessage) {
+ if (message is RandomMessage) {
+ result.offer(message)
+ }
+ }
+ }
+ connector.attachObserver(observer)
+ connector.detachObserver(observer)
+ connector.start()
+
+ val codec = DefaultPacketCodec(SECP256K1.KeyPair.random(), RoutingTable(Bytes.random(32)))
+ val socketChannel = CoroutineDatagramChannel.open()
+
+ runBlocking {
+ val message = RandomMessage()
+ val encodedRandomMessage = codec.encode(message, Hash.sha2_256(connector.getEnrBytes()))
+ val buffer = ByteBuffer.wrap(encodedRandomMessage.content.toArray())
+ socketChannel.send(buffer, InetSocketAddress(InetAddress.getLocalHost(), 9090))
+ assert(observer.result.isEmpty)
+ }
+ }
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
index 22b323c..551bd27 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/FindNodeMessageTest.kt
@@ -34,7 +34,7 @@
val decodingResult = FindNodeMessage.create(encodingResult)
assert(decodingResult.requestId == requestId)
- assert(decodingResult.distance == 0L)
+ assert(decodingResult.distance == 0)
}
@Test
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
index 2060816..ac125b9 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
@@ -27,12 +27,12 @@
"0xB53CCF732982B8E950836D1E02898C8B38CFDBFDF86BC65C8826506B454E14618EA73612A0F5582C130FF666"
val data = Bytes.fromHexString(expectedEncodingResult)
- val message = RandomMessage(data)
+ val message = RandomMessage(UdpMessage.authTag(), data)
val encodingResult = message.encode()
assert(encodingResult.toHexString() == expectedEncodingResult)
- val decodingResult = RandomMessage.create(encodingResult)
+ val decodingResult = RandomMessage.create(UdpMessage.authTag(), encodingResult)
assert(decodingResult.data == data)
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt
new file mode 100644
index 0000000..4485c82
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/EnrStorageTest.kt
@@ -0,0 +1,28 @@
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class EnrStorageTest {
+
+ private val storage: ENRStorage = DefaultENRStorage()
+
+
+ @Test
+ fun setPersistsAndFindRetrievesNodeRecord() {
+ val enr = EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost())
+
+ storage.set(enr)
+
+ val nodeId = Hash.sha2_256(enr)
+ storage.find(nodeId)
+ }
+
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
new file mode 100644
index 0000000..fbf2ede
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class RoutingTableTest {
+
+ private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
+ private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+ private val routingTable: RoutingTable = RoutingTable(enr)
+
+ private val newKeyPair = SECP256K1.KeyPair.random()
+ private val newEnr = EthereumNodeRecord.toRLP(newKeyPair, ip = InetAddress.getLocalHost())
+
+ @Test
+ fun addCreatesRecordInBucket() {
+ routingTable.add(newEnr)
+
+ assert(!routingTable.isEmpty())
+ }
+
+ @Test
+ fun evictRemovesRecord() {
+ routingTable.add(newEnr)
+
+ assert(!routingTable.isEmpty())
+
+ routingTable.evict(newEnr)
+
+ assert(routingTable.isEmpty())
+ }
+
+ @Test
+ fun getSelfEnrGivesTableOwnerEnr() {
+ val result = routingTable.getSelfEnr()
+
+ assert(result == enr)
+ }
+
+ @AfterEach
+ fun tearDown() {
+ routingTable.clear()
+ }
+}
diff --git a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
index 2299bb1..bf34fad 100644
--- a/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
+++ b/kademlia/src/main/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTable.kt
@@ -94,7 +94,8 @@
private val selfId: ByteArray,
k: Int,
maxReplacements: Int = k,
- private val nodeId: (T) -> ByteArray
+ private val nodeId: (T) -> ByteArray,
+ private val distanceToSelf: (T) -> Int = { nodeId(it) xorDist selfId }
) : Set<T> {
companion object {
@@ -108,8 +109,13 @@
* @return A new routing table
*/
@JvmStatic
- fun <T> create(selfId: ByteArray, k: Int, nodeId: Function<T, ByteArray>): KademliaRoutingTable<T> =
- KademliaRoutingTable(selfId, k, nodeId = nodeId::apply)
+ fun <T> create(
+ selfId: ByteArray,
+ k: Int,
+ nodeId: Function<T, ByteArray>,
+ distanceToSelf: Function<T, Int>
+ ): KademliaRoutingTable<T> =
+ KademliaRoutingTable(selfId, k, nodeId = nodeId::apply, distanceToSelf = distanceToSelf::apply)
/**
* Create a new routing table.
@@ -126,8 +132,9 @@
selfId: ByteArray,
k: Int,
maxReplacements: Int,
- nodeId: Function<T, ByteArray>
- ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply)
+ nodeId: Function<T, ByteArray>,
+ distanceToSelf: Function<T, Int>
+ ): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply, distanceToSelf::apply)
}
init {
@@ -209,6 +216,16 @@
buckets.forEach { bucket -> bucket.clear() }
}
+ fun peersOfDistance(value: Int): List<T> {
+ return buckets[value].toList()
+ }
+
+ fun getRandom(): T {
+ return buckets.filter { !it.isEmpty() }.random().random()
+ }
+
+ fun logDistToSelf(node: T): Int = distanceCache.get(node) { distanceToSelf.invoke(node) }
+
private fun idForNode(node: T): ByteArray {
val id = nodeId(node)
require(id.size == selfId.size) { "id obtained for node is not the correct length" }
@@ -218,8 +235,6 @@
private fun bucketFor(node: T) = buckets[logDistToSelf(node)]
- private fun logDistToSelf(node: T): Int = distanceCache.get(node) { idForNode(node) xorDist selfId }
-
private class Bucket<E> private constructor(
// ordered with most recent first
private val entries: MutableList<E>,
diff --git a/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt b/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
index 4d5f10b..b50c4d8 100644
--- a/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
+++ b/kademlia/src/test/kotlin/org/apache/tuweni/kademlia/KademliaRoutingTableTest.kt
@@ -47,14 +47,14 @@
@Test
fun shouldBeEmptyOnConstruction() {
- val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
assertTrue(table.isEmpty())
assertEquals(0, table.size)
}
@Test
fun shouldAddToEmptyTable() {
- val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
val node = Node(0x01)
assertNull(table.add(node))
assertTrue(table.contains(node))
@@ -63,7 +63,7 @@
@Test
fun shouldProposeEvictionIfBucketIsFull() {
- val table = KademliaRoutingTable<Node>(shortId, 2) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 2, nodeId = { n -> n.nodeId })
val node1 = Node(0x04)
assertNull(table.add(node1))
val node2 = Node(0x05)
@@ -82,7 +82,7 @@
@Test
fun shouldReplaceEvictedNode() {
- val table = KademliaRoutingTable<Node>(shortId, 2) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 2, nodeId = { n -> n.nodeId })
val node1 = Node(0x04)
val node2 = Node(0x05)
table.add(node1)
@@ -100,7 +100,7 @@
@Test
fun shouldReturnNearestOrderedNodesUpToLimit() {
- val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
table.add(Node(0x05))
table.add(Node(0x04))
val node3 = Node(0x03)
@@ -119,7 +119,7 @@
@Test
fun shouldReturnAllNodesWhenTableIsSmallerThanLimit() {
- val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
table.add(Node(0x05))
table.add(Node(0x04))
table.add(Node(0x03))
@@ -134,7 +134,7 @@
@Test
fun shouldClearAllNodes() {
- val table = KademliaRoutingTable<Node>(shortId, 16) { n -> n.nodeId }
+ val table = KademliaRoutingTable<Node>(shortId, 16, nodeId = { n -> n.nodeId })
table.add(Node(0x05))
table.add(Node(0x04))
table.add(Node(0x03))