Merge branch 'v5-udp-transport' into v5-topics
# Conflicts:
# devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
# devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
new file mode 100644
index 0000000..8224a27
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/ENRStorage.kt
@@ -0,0 +1,11 @@
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.bytes.Bytes
+
+interface ENRStorage {
+
+ fun set(enr: Bytes)
+
+ fun find(nodeId: Bytes): Bytes?
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt
new file mode 100644
index 0000000..2d926ca
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/MessageListener.kt
@@ -0,0 +1,9 @@
+package org.apache.tuweni.devp2p.v5
+
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+interface MessageObserver {
+
+ fun observe(message: UdpMessage)
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
index ee4db65..cbe8a3d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/NodeDiscoveryService.kt
@@ -22,10 +22,14 @@
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
import org.apache.tuweni.io.Base64URLSafe
import java.net.InetSocketAddress
import java.time.Instant
@@ -62,7 +66,8 @@
null,
bindAddress.port
),
- private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR),
+ private val enrStorage: ENRStorage = DefaultENRStorage(),
+ private val connector: UdpConnector = DefaultUdpConnector(bindAddress, keyPair, selfENR, enrStorage),
override val coroutineContext: CoroutineContext = Dispatchers.Default
) : NodeDiscoveryService, CoroutineScope {
@@ -90,9 +95,10 @@
val randomMessage = RandomMessage()
val address = InetSocketAddress(enr.ip(), enr.udp())
+ val destNodeId = Hash.sha2_256(rlpENR)
+ enrStorage.set(rlpENR)
connector.getNodesTable().add(rlpENR)
- connector.addPendingNodeId(address, rlpENR)
- connector.send(address, randomMessage, rlpENR)
+ connector.send(address, randomMessage, destNodeId)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
index eec5073..23a2cf3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/PacketCodec.kt
@@ -19,6 +19,7 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
/**
@@ -43,7 +44,7 @@
*
* @return encoded message
*/
- fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): Bytes
+ fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters? = null): EncodeResult
/**
* Decodes message, decrypting it's body
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 948903d..72900f0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -19,9 +19,10 @@
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
import org.apache.tuweni.devp2p.v5.topic.TicketHolder
import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
import org.apache.tuweni.devp2p.v5.topic.TopicTable
@@ -87,7 +88,7 @@
*
* @return node identifier
*/
- fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes
+ fun findPendingNodeId(address: InetSocketAddress): Bytes
/**
* Provides node's key pair
@@ -110,6 +111,10 @@
*/
fun getEnr(): EthereumNodeRecord
+ fun attachObserver(observer: MessageObserver)
+
+ fun detachObserver(observer: MessageObserver)
+
fun getNodesTable(): RoutingTable
/**
@@ -128,6 +133,10 @@
fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
+ fun getPendingMessage(authTag: Bytes): TrackingMessage
+
+ fun getNodeRecords(): ENRStorage
+
fun getTopicRegistrar(): TopicRegistrar
fun getSessionInitiatorKey(nodeId: Bytes): Bytes
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
index cb21279..0858bdd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/encrypt/SessionKeyGenerator.kt
@@ -27,7 +27,7 @@
*/
object SessionKeyGenerator {
- private const val DERIVED_KEY_SIZE: Int = 16
+ const val DERIVED_KEY_SIZE: Int = 16
private val INFO_PREFIX = Bytes.wrap("discovery v5 key agreement".toByteArray())
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
index 7c397ca..68f48e6 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProvider.kt
@@ -24,7 +24,7 @@
import org.apache.tuweni.devp2p.ENR_REQUEST_RETRY_DELAY_MS
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
import org.apache.tuweni.devp2p.v5.misc.AuthHeader
@@ -40,7 +40,7 @@
private val sessionKeys: Cache<String, SessionKey> = CacheBuilder
.newBuilder()
- .expireAfterWrite(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(SESSION_KEY_EXPIRATION, TimeUnit.MINUTES)
.build()
private val nodeId: Bytes = Hash.sha2_256(routingTable.getSelfEnr())
@@ -133,6 +133,8 @@
}
companion object {
+ private const val SESSION_KEY_EXPIRATION: Long = 5
+
private const val ZERO_NONCE_SIZE: Int = 12
private const val VERSION: Int = 5
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 4c0080d..6216138 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -21,14 +21,16 @@
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.PacketCodec
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
+import org.apache.tuweni.devp2p.v5.encrypt.SessionKeyGenerator
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.AuthHeader
import org.apache.tuweni.devp2p.v5.misc.DecodeResult
+import org.apache.tuweni.devp2p.v5.misc.EncodeResult
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.devp2p.v5.packet.NodesMessage
import org.apache.tuweni.devp2p.v5.packet.PingMessage
@@ -48,37 +50,36 @@
private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable)
) : PacketCodec {
- override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): Bytes {
+ override fun encode(message: UdpMessage, destNodeId: Bytes, handshakeParams: HandshakeInitParameters?): EncodeResult {
if (message is WhoAreYouMessage) {
val magic = UdpMessage.magic(nodeId)
val content = message.encode()
- return Bytes.wrap(magic, content)
+ return EncodeResult(magic, Bytes.wrap(magic, content))
}
val tag = UdpMessage.tag(nodeId, destNodeId)
if (message is RandomMessage) {
- val authTag = UdpMessage.authTag()
- val rlpAuthTag = RLP.encodeValue(authTag)
+ val rlpAuthTag = RLP.encodeValue(message.authTag)
val content = message.encode()
- return Bytes.wrap(tag, rlpAuthTag, content)
+ return EncodeResult(message.authTag, Bytes.wrap(tag, rlpAuthTag, content))
}
val authHeader = handshakeParams?.let { authenticationProvider.authenticate(handshakeParams) }
val initiatorKey = authenticationProvider.findSessionKey(destNodeId.toHexString())?.initiatorKey
- ?: throw IllegalArgumentException() // TODO handle
+ ?: Bytes.random(SessionKeyGenerator.DERIVED_KEY_SIZE) // encrypt with random key, to initiate handshake
val messagePlain = Bytes.wrap(message.getMessageType(), message.encode())
return if (null != authHeader) {
val encodedHeader = authHeader.asRlp()
val authTag = authHeader.authTag
val encryptionMeta = Bytes.wrap(tag, encodedHeader)
val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, encryptionMeta)
- Bytes.wrap(tag, encodedHeader, encryptionResult)
+ EncodeResult(authTag, Bytes.wrap(tag, encodedHeader, encryptionResult))
} else {
val authTag = UdpMessage.authTag()
val authTagHeader = RLP.encodeValue(authTag)
val encryptionResult = AES128GCM.encrypt(initiatorKey, authTag, messagePlain, tag)
- Bytes.wrap(tag, authTagHeader, encryptionResult)
+ EncodeResult(authTag, Bytes.wrap(tag, authTagHeader, encryptionResult))
}
}
@@ -93,6 +94,7 @@
private fun read(tag: Bytes, senderNodeId: Bytes, contentWithHeader: Bytes, reader: RLPReader): UdpMessage {
// Distinguish auth header or auth tag
var authHeader: AuthHeader? = null
+ var authTag: Bytes = Bytes.EMPTY
if (reader.nextIsList()) {
if (WHO_ARE_YOU_MESSAGE_LENGTH == contentWithHeader.size()) {
return WhoAreYouMessage.create(contentWithHeader)
@@ -107,14 +109,14 @@
}
authenticationProvider.finalizeHandshake(senderNodeId, authHeader)
} else {
- reader.readValue()
+ authTag = reader.readValue()
}
val encryptedContent = contentWithHeader.slice(reader.position())
// Decrypt
val decryptionKey = authenticationProvider.findSessionKey(senderNodeId.toHexString())?.initiatorKey
- ?: return RandomMessage(encryptedContent)
+ ?: return RandomMessage.create(authTag, encryptedContent)
val decryptMetadata = authHeader?.let { Bytes.wrap(tag, authHeader.asRlp()) } ?: tag
val decryptedContent = AES128GCM.decrypt(encryptedContent, decryptionKey, decryptMetadata)
val messageType = decryptedContent.slice(0, Byte.SIZE_BYTES)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index 6de199e..6816324 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -27,11 +27,13 @@
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.ENRStorage
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.MessageObserver
import org.apache.tuweni.devp2p.v5.PacketCodec
import org.apache.tuweni.devp2p.v5.UdpConnector
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.internal.handler.FindNodeMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.NodesMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
@@ -44,6 +46,11 @@
import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
import org.apache.tuweni.devp2p.v5.packet.NodesMessage
import org.apache.tuweni.devp2p.v5.packet.PingMessage
import org.apache.tuweni.devp2p.v5.packet.PongMessage
@@ -57,6 +64,7 @@
import org.apache.tuweni.devp2p.v5.topic.TicketHolder
import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
import org.apache.tuweni.devp2p.v5.topic.TopicTable
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import java.net.InetSocketAddress
import java.nio.ByteBuffer
@@ -68,9 +76,8 @@
private val bindAddress: InetSocketAddress,
private val keyPair: SECP256K1.KeyPair,
private val selfEnr: Bytes,
- private val nodeId: Bytes = Hash.sha2_256(selfEnr),
+ private val enrStorage: ENRStorage = DefaultENRStorage(),
private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
- private val sendChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
private val nodesTable: RoutingTable = RoutingTable(selfEnr),
private val topicTable: TopicTable = TopicTable(),
private val ticketHolder: TicketHolder = TicketHolder(),
@@ -78,13 +85,14 @@
private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable, nodeId, authenticationProvider),
private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf(),
private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
+ private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
override val coroutineContext: CoroutineContext = Dispatchers.IO
) : UdpConnector, CoroutineScope {
private val log: Logger = Logger.getLogger(this.javaClass.simpleName)
private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
- private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
+ private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler()
private val findNodeMessageHandler: MessageHandler<FindNodeMessage> = FindNodeMessageHandler()
private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
@@ -96,32 +104,44 @@
private val topicRegistrar = TopicRegistrar(coroutineContext, this)
+ private val askedNodes: MutableList<Bytes> = mutableListOf()
+
+ private val pendingMessages: Cache<String, TrackingMessage> = CacheBuilder.newBuilder()
+ .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
+ .build()
private val pings: Cache<String, Bytes> = CacheBuilder.newBuilder()
- .expireAfterWrite(Duration.ofMillis(PING_TIMEOUT))
+ .expireAfterWrite(Duration.ofMillis(REQUEST_TIMEOUT))
.removalListener<String, Bytes> {
getNodesTable().evict(it.value)
}.build()
private lateinit var refreshJob: Job
private lateinit var receiveJob: Job
+ private lateinit var lookupJob: Job
+
+ override fun available(): Boolean = receiveChannel.isOpen
+
+ override fun started(): Boolean = ::receiveJob.isInitialized && available()
+
+ override fun getEnrBytes(): Bytes = selfEnr
+
+ override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+
+ override fun getNodeRecords(): ENRStorage = enrStorage
+
+ override fun getNodesTable(): RoutingTable = nodesTable
+
+ override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
+
+ override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
+ ?: throw IllegalArgumentException("Pending message not found")
+
override fun start() {
receiveChannel.bind(bindAddress)
- receiveJob = launch {
- val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
- while (receiveChannel.isOpen) {
- datagram.clear()
- val address = receiveChannel.receive(datagram) as InetSocketAddress
- datagram.flip()
- try {
- processDatagram(datagram, address)
- } catch (ex: Exception) {
- log.warning(ex.message)
- }
- }
- }
-
+ receiveJob = receiveDatagram()
+ lookupJob = lookupNodes()
refreshJob = refreshNodesTable()
}
@@ -132,42 +152,39 @@
handshakeParams: HandshakeInitParameters?
) {
launch {
- val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
- sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
+ val encodeResult = packetCodec.encode(message, destNodeId, handshakeParams)
+ pendingMessages.put(encodeResult.authTag.toHexString(), TrackingMessage(message, destNodeId))
+ receiveChannel.send(ByteBuffer.wrap(encodeResult.content.toArray()), address)
}
}
override fun terminate() {
receiveChannel.close()
- sendChannel.close()
- receiveJob.cancel()
refreshJob.cancel()
+ lookupJob.cancel()
+ receiveJob.cancel()
}
- override fun available(): Boolean = receiveChannel.isOpen
+ override fun attachObserver(observer: MessageObserver) {
+ messageListeners.add(observer)
+ }
- override fun started(): Boolean = ::receiveJob.isInitialized && available()
-
- override fun getEnrBytes(): Bytes = selfEnr
-
- override fun getEnr(): EthereumNodeRecord = selfNodeRecord
+ override fun detachObserver(observer: MessageObserver) {
+ messageListeners.remove(observer)
+ }
override fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes) {
authenticatingPeers[address] = nodeId
}
- override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
-
- override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
+ override fun findPendingNodeId(address: InetSocketAddress): Bytes {
val result = authenticatingPeers[address]
?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
authenticatingPeers.remove(address)
return result
}
- override fun getNodesTable(): RoutingTable = nodesTable
-
override fun getTopicTable(): TopicTable = topicTable
override fun getTicketHolder(): TicketHolder = ticketHolder
@@ -186,6 +203,45 @@
?: throw IllegalArgumentException("Session key not found.")
}
+ // Lookup nodes
+ private fun lookupNodes() = launch {
+ while (true) {
+ val nearestNodes = getNodesTable().nearest(selfEnr)
+ if (16 > nearestNodes.size) {
+ lookupInternal(nearestNodes)
+ } else {
+ askedNodes.clear()
+ }
+ delay(LOOKUP_REFRESH_RATE)
+ }
+ }
+
+ private fun lookupInternal(nearest: List<Bytes>) {
+ val targetNode = if (nearest.isNotEmpty()) nearest.random() else Bytes.random(32)
+ val distance = getNodesTable().distanceToSelf(targetNode)
+ for (target in nearest.take(3)) {
+ val enr = EthereumNodeRecord.fromRLP(target)
+ val message = FindNodeMessage(distance = distance)
+ val address = InetSocketAddress(enr.ip(), enr.udp())
+ send(address, message, Hash.sha2_256(target))
+ }
+ }
+
+ // Process packets
+ private fun receiveDatagram() = launch {
+ val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
+ while (receiveChannel.isOpen) {
+ datagram.clear()
+ val address = receiveChannel.receive(datagram) as InetSocketAddress
+ datagram.flip()
+ try {
+ processDatagram(datagram, address)
+ } catch (ex: Exception) {
+ log.warning(ex.message)
+ }
+ }
+ }
+
private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
val messageBytes = Bytes.wrapByteBuffer(datagram)
val decodeResult = packetCodec.decode(messageBytes)
@@ -203,8 +259,10 @@
is TopicQueryMessage -> topicQueryMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
+ messageListeners.forEach { it.observe(message) }
}
+ // Ping nodes
private fun refreshNodesTable(): Job = launch {
while (true) {
if (!getNodesTable().isEmpty()) {
@@ -217,12 +275,13 @@
send(address, message, nodeId)
pings.put(nodeId.toHexString(), enrBytes)
}
- delay(REFRESH_RATE)
+ delay(TABLE_REFRESH_RATE)
}
}
companion object {
- private const val REFRESH_RATE: Long = 1000
- private const val PING_TIMEOUT: Long = 20000
+ private const val LOOKUP_REFRESH_RATE: Long = 3000
+ private const val TABLE_REFRESH_RATE: Long = 1000
+ private const val REQUEST_TIMEOUT: Long = 1000
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
index 60b731d..d8b33dd 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/NodesMessageHandler.kt
@@ -28,6 +28,7 @@
override fun handle(message: NodesMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
message.nodeRecords.forEach {
EthereumNodeRecord.fromRLP(it)
+ connector.getNodeRecords().set(it)
connector.getNodesTable().add(it)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
index 477710a..60f965d 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RandomMessageHandler.kt
@@ -26,8 +26,7 @@
class RandomMessageHandler : MessageHandler<RandomMessage> {
override fun handle(message: RandomMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
- connector.addPendingNodeId(address, srcNodeId)
- val response = WhoAreYouMessage()
+ val response = WhoAreYouMessage(message.authTag)
connector.send(address, response, srcNodeId)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
index 754ceae..62025f3 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/WhoAreYouMessageHandler.kt
@@ -17,17 +17,16 @@
package org.apache.tuweni.devp2p.v5.internal.handler
import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.devp2p.v5.MessageHandler
import org.apache.tuweni.devp2p.v5.UdpConnector
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import java.lang.IllegalArgumentException
import java.net.InetSocketAddress
-class WhoAreYouMessageHandler(
- private val nodeId: Bytes
-) : MessageHandler<WhoAreYouMessage> {
+class WhoAreYouMessageHandler : MessageHandler<WhoAreYouMessage> {
override fun handle(
message: WhoAreYouMessage,
@@ -36,11 +35,12 @@
connector: UdpConnector
) {
// Retrieve enr
- val destRlp = connector.getPendingNodeIdByAddress(address)
- val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, destRlp)
- val destNodeId = Hash.sha2_256(destRlp)
+ val trackingMessage = connector.getPendingMessage(message.authTag)
+ val rlpEnr = connector.getNodeRecords().find(trackingMessage.nodeId)
+ ?: throw IllegalArgumentException("Unable to find node enr by id ${trackingMessage.nodeId}")
+ val handshakeParams = HandshakeInitParameters(message.idNonce, message.authTag, rlpEnr)
- val findNodeMessage = FindNodeMessage()
- connector.send(address, findNodeMessage, destNodeId, handshakeParams)
+ val response = if (trackingMessage.message is RandomMessage) FindNodeMessage() else trackingMessage.message
+ connector.send(address, response, trackingMessage.nodeId, handshakeParams)
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
new file mode 100644
index 0000000..5100329
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/EncodeResult.kt
@@ -0,0 +1,8 @@
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+
+class EncodeResult(
+ val authTag: Bytes,
+ val content: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
new file mode 100644
index 0000000..66987c1
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/misc/TrackingMessage.kt
@@ -0,0 +1,9 @@
+package org.apache.tuweni.devp2p.v5.misc
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+
+class TrackingMessage(
+ val message: UdpMessage,
+ val nodeId: Bytes
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
index e43ded7..99dfb51 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessage.kt
@@ -19,6 +19,7 @@
import org.apache.tuweni.bytes.Bytes
class RandomMessage(
+ val authTag: Bytes = authTag(),
val data: Bytes = randomData()
) : UdpMessage() {
@@ -27,8 +28,8 @@
}
companion object {
- fun create(content: Bytes): RandomMessage {
- return RandomMessage(content)
+ fun create(authTag: Bytes, content: Bytes): RandomMessage {
+ return RandomMessage(authTag, content)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
new file mode 100644
index 0000000..80e9d5b
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/DefaultENRStorage.kt
@@ -0,0 +1,19 @@
+package org.apache.tuweni.devp2p.v5.storage
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.v5.ENRStorage
+import java.util.concurrent.ConcurrentHashMap
+
+class DefaultENRStorage: ENRStorage {
+
+ private val storage: MutableMap<String, Bytes> = ConcurrentHashMap()
+
+ override fun find(nodeId: Bytes): Bytes? = storage[nodeId.toHexString()]
+
+ override fun set(enr: Bytes) {
+ val nodeId = Hash.sha2_256(enr)
+ storage[nodeId.toHexString()] = enr
+ }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
similarity index 80%
rename from devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
rename to devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
index 289be97..cd48508 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTable.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTable.kt
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tuweni.devp2p.v5.dht
+package org.apache.tuweni.devp2p.v5.storage
+import com.google.common.math.IntMath
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.kademlia.KademliaRoutingTable
import org.apache.tuweni.kademlia.xorDist
+import java.math.RoundingMode
class RoutingTable(
private val selfEnr: Bytes
@@ -32,12 +34,19 @@
selfId = selfNodeId,
k = BUCKET_SIZE,
nodeId = nodeIdCalculation,
- distanceToSelf = { key(it) xorDist selfNodeId })
+ distanceToSelf = { IntMath.log2(key(it) xorDist selfNodeId, RoundingMode.FLOOR) })
+
+ val size: Int
+ get() = table.size
fun getSelfEnr(): Bytes = selfEnr
fun add(enr: Bytes): Bytes? = table.add(enr)
+ fun nearest(targetId: Bytes, limit: Int = BUCKET_SIZE): List<Bytes> = table.nearest(key(targetId), limit)
+
+ fun distanceToSelf(targetId: Bytes): Int = table.logDistToSelf(targetId)
+
fun evict(enr: Bytes): Boolean = table.evict(enr)
fun random(): Bytes = table.getRandom()
@@ -53,4 +62,5 @@
companion object {
private const val BUCKET_SIZE: Int = 16
}
+
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
new file mode 100644
index 0000000..f503787
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/AbstractIntegrationTest.kt
@@ -0,0 +1,125 @@
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
+import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+import java.net.InetSocketAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+abstract class AbstractIntegrationTest {
+
+ protected fun createNode(
+ port: Int = 9090,
+ bootList: List<String> = emptyList(),
+ enrStorage: ENRStorage = DefaultENRStorage(),
+ keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random(),
+ enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = port),
+ routingTable: RoutingTable = RoutingTable(enr),
+ address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), port),
+ authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, routingTable),
+ packetCodec: PacketCodec = DefaultPacketCodec(
+ keyPair,
+ routingTable,
+ authenticationProvider = authenticationProvider
+ ),
+ connector: UdpConnector = DefaultUdpConnector(
+ address,
+ keyPair,
+ enr,
+ enrStorage,
+ nodesTable = routingTable,
+ packetCodec = packetCodec
+ ),
+ service: NodeDiscoveryService =
+ DefaultNodeDiscoveryService(
+ keyPair,
+ port,
+ enrStorage = enrStorage,
+ bootstrapENRList = bootList,
+ connector = connector
+ )
+ ): TestNode {
+ service.start()
+ return TestNode(
+ bootList,
+ port,
+ enrStorage,
+ keyPair,
+ enr,
+ address,
+ routingTable,
+ authenticationProvider,
+ packetCodec,
+ connector,
+ service
+ )
+ }
+
+ protected fun handshake(initiator: TestNode, recipient: TestNode): Boolean {
+ initiator.enrStorage.set(recipient.enr)
+ initiator.routingTable.add(recipient.enr)
+ val message = RandomMessage()
+ initiator.connector.send(recipient.address, message, recipient.nodeId)
+ while (true) {
+ if (null != recipient.authenticationProvider.findSessionKey(initiator.nodeId.toHexString())) {
+ return true
+ }
+ }
+ }
+
+ protected fun send(initiator: TestNode, recipient: TestNode, message: UdpMessage) {
+ if (message is RandomMessage || message is WhoAreYouMessage) {
+ throw IllegalArgumentException("Can't send handshake initiation message")
+ }
+ initiator.connector.send(recipient.address, message, recipient.nodeId)
+ }
+
+ protected inline fun <reified T: UdpMessage>sendAndAwait(initiator: TestNode, recipient: TestNode, message: UdpMessage): T {
+ val listener = object : MessageObserver {
+ var result: Channel<T> = Channel()
+
+ override fun observe(message: UdpMessage) {
+ if (message is T) {
+ result.offer(message)
+ }
+ }
+ }
+
+ return runBlocking {
+ initiator.connector.attachObserver(listener)
+ send(initiator, recipient, message)
+ val result = listener.result.receive()
+ initiator.connector.detachObserver(listener)
+ result
+ }
+ }
+}
+
+class TestNode(
+ val bootList: List<String>,
+ val port: Int,
+ val enrStorage: ENRStorage,
+ val keyPair: SECP256K1.KeyPair,
+ val enr: Bytes,
+ val address: InetSocketAddress,
+ val routingTable: RoutingTable,
+ val authenticationProvider: AuthenticationProvider,
+ val packetCodec: PacketCodec,
+ val connector: UdpConnector,
+ val service: NodeDiscoveryService,
+ val nodeId: Bytes = Hash.sha2_256(enr)
+)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
index a9f64e5..2b7d8a7 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
@@ -64,7 +64,7 @@
bootstrapENRList,
enrSeq,
selfENR,
- connector
+ connector = connector
)
@Test
@@ -81,7 +81,7 @@
val receivedBytes = Bytes.wrapByteBuffer(buffer)
val content = receivedBytes.slice(45)
- val message = RandomMessage.create(content)
+ val message = RandomMessage.create(UdpMessage.authTag(), content)
assert(message.data.size() == UdpMessage.RANDOM_DATA_LENGTH)
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
deleted file mode 100644
index fa99cc3..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeIntegrationTest.kt
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5
-
-import kotlinx.coroutines.runBlocking
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.crypto.Hash
-import org.apache.tuweni.crypto.SECP256K1
-import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
-import org.apache.tuweni.devp2p.v5.internal.DefaultAuthenticationProvider
-import org.apache.tuweni.devp2p.v5.internal.DefaultPacketCodec
-import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
-import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
-import org.apache.tuweni.devp2p.v5.packet.RandomMessage
-import org.apache.tuweni.devp2p.v5.packet.UdpMessage
-import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
-import org.apache.tuweni.io.Base64URLSafe
-import org.apache.tuweni.junit.BouncyCastleExtension
-import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.extension.ExtendWith
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-@ExtendWith(BouncyCastleExtension::class)
-class HandshakeIntegrationTest {
-
- private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
- private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost(), udp = SERVICE_PORT)
- private val address: InetSocketAddress = InetSocketAddress(InetAddress.getLocalHost(), SERVICE_PORT)
- private val connector: UdpConnector = DefaultUdpConnector(address, keyPair, enr)
-
- private val clientKeyPair = SECP256K1.KeyPair.random()
- private val clientEnr = EthereumNodeRecord.toRLP(clientKeyPair, ip = InetAddress.getLocalHost(), udp = CLIENT_PORT)
- private val routingTable = RoutingTable(clientEnr)
- private val authProvider = DefaultAuthenticationProvider(clientKeyPair, routingTable)
- private val clientCodec = DefaultPacketCodec(clientKeyPair, routingTable, authenticationProvider = authProvider)
- private val socket = CoroutineDatagramChannel.open()
-
- private val clientNodeId: Bytes = Hash.sha2_256(clientEnr)
-
- private val bootList = listOf("enr:${Base64URLSafe.encode(clientEnr)}")
- private val service: NodeDiscoveryService =
- DefaultNodeDiscoveryService(keyPair, SERVICE_PORT, bootstrapENRList = bootList, connector = connector)
-
- @BeforeEach
- fun init() {
- socket.bind(InetSocketAddress(9091))
- service.start()
- }
-
- @Test
- fun discv5HandshakeTest() {
- runBlocking {
- val buffer = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
- socket.receive(buffer)
- buffer.flip()
-
- var content = Bytes.wrapByteBuffer(buffer)
- var decodingResult = clientCodec.decode(content)
- assert(decodingResult.message is RandomMessage)
- buffer.clear()
-
- sendWhoAreYou()
-
- socket.receive(buffer)
- buffer.flip()
- content = Bytes.wrapByteBuffer(buffer)
- decodingResult = clientCodec.decode(content)
- assert(decodingResult.message is FindNodeMessage)
- assert(null != authProvider.findSessionKey(Hash.sha2_256(enr).toHexString()))
-
- val message = decodingResult.message as FindNodeMessage
-
- assert(message.distance == 0)
- assert(message.requestId.size() == UdpMessage.REQUEST_ID_LENGTH)
- }
- }
-
- @AfterEach
- fun tearDown() {
- service.terminate(true)
- socket.close()
- }
-
- private suspend fun sendWhoAreYou() {
- val message = WhoAreYouMessage()
- val bytes = clientCodec.encode(message, clientNodeId)
- val buffer = ByteBuffer.wrap(bytes.toArray())
- socket.send(buffer, address)
- }
-
- companion object {
- private const val SERVICE_PORT: Int = 9090
- private const val CLIENT_PORT: Int = 9091
- }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt
new file mode 100644
index 0000000..c76b91c
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/HandshakeTest.kt
@@ -0,0 +1,57 @@
+package org.apache.tuweni.devp2p.v5
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.devp2p.v5.packet.PingMessage
+import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.junit.jupiter.api.Test
+
+class HandshakeTest: AbstractIntegrationTest() {
+
+ @Test
+ fun testHandshake() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ val result = handshake(node1, node2)
+
+ assert(result)
+
+ node1.service.terminate(true)
+ node2.service.terminate(true)
+ }
+
+ @Test
+ fun testPing() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ handshake(node1, node2)
+ val pong = sendAndAwait<PongMessage>(node1, node2, PingMessage())
+
+ assert(node1.port == pong.recipientPort)
+
+ node1.service.terminate(true)
+ node2.service.terminate(true)
+ }
+
+ @Test
+ fun testTableMaintenance() {
+ val node1 = createNode(9090)
+ val node2 = createNode(9091)
+
+ handshake(node1, node2)
+ runBlocking {
+ assert(!node1.routingTable.isEmpty())
+
+ node2.service.terminate( true)
+
+ delay(3000)
+
+ assert(node1.routingTable.isEmpty())
+ }
+
+ node1.service.terminate(true)
+ }
+
+}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
index 1d9bb5a..7629d80 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultAuthenticationProviderTest.kt
@@ -20,7 +20,7 @@
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.devp2p.v5.misc.SessionKey
import org.apache.tuweni.junit.BouncyCastleExtension
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
index 18a5577..50a677d 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodecTest.kt
@@ -22,7 +22,7 @@
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.PacketCodec
-import org.apache.tuweni.devp2p.v5.dht.RoutingTable
+import org.apache.tuweni.devp2p.v5.storage.RoutingTable
import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
import org.apache.tuweni.devp2p.v5.misc.SessionKey
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
@@ -54,8 +54,8 @@
val encodedResult = codec.encode(message, destNodeId)
- val encodedContent = encodedResult.slice(45)
- val result = RandomMessage.create(encodedContent)
+ val encodedContent = encodedResult.content.slice(45)
+ val result = RandomMessage.create(UdpMessage.authTag(), encodedContent)
assert(result.data == message.data)
}
@@ -66,7 +66,7 @@
val encodedResult = codec.encode(message, destNodeId)
- val encodedContent = encodedResult.slice(32)
+ val encodedContent = encodedResult.content.slice(32)
val result = WhoAreYouMessage.create(encodedContent)
assert(result.idNonce == message.idNonce)
@@ -85,8 +85,8 @@
val encodedResult = codec.encode(message, destNodeId)
- val tag = encodedResult.slice(0, UdpMessage.TAG_LENGTH)
- val encryptedContent = encodedResult.slice(45)
+ val tag = encodedResult.content.slice(0, UdpMessage.TAG_LENGTH)
+ val encryptedContent = encodedResult.content.slice(45)
val content = AES128GCM.decrypt(encryptedContent, sessionKey.initiatorKey, tag).slice(1)
val result = FindNodeMessage.create(content)
@@ -95,21 +95,12 @@
}
@Test
- fun encodeFailsIfSessionKeyIsNotExists() {
- val message = FindNodeMessage()
-
- assertThrows<IllegalArgumentException> {
- codec.encode(message, destNodeId)
- }
- }
-
- @Test
fun decodePerformsValidDecodingOfRandomMessasge() {
val message = RandomMessage()
val encodedResult = codec.encode(message, destNodeId)
- val result = codec.decode(encodedResult).message as? RandomMessage
+ val result = codec.decode(encodedResult.content).message as? RandomMessage
assert(null != result)
assert(result!!.data == message.data)
@@ -121,7 +112,7 @@
val encodedResult = codec.encode(message, destNodeId)
- val result = codec.decode(encodedResult).message as? WhoAreYouMessage
+ val result = codec.decode(encodedResult.content).message as? WhoAreYouMessage
assert(null != result)
assert(result!!.idNonce == message.idNonce)
@@ -141,7 +132,7 @@
val encodedResult = codec.encode(message, nodeId)
- val result = codec.decode(encodedResult).message as? FindNodeMessage
+ val result = codec.decode(encodedResult.content).message as? FindNodeMessage
assert(result!!.requestId == message.requestId)
assert(result.distance == message.distance)
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
index a345208..7d77329 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
@@ -37,18 +37,17 @@
class DefaultUdpConnectorTest {
private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
- private val nodeId: Bytes = Bytes.fromHexString("0x98EB6D611291FA21F6169BFF382B9369C33D997FE4DC93410987E27796360640")
private val address: InetSocketAddress = InetSocketAddress(9090)
private val selfEnr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = address.address)
private val data: Bytes = UdpMessage.randomData()
- private val message: RandomMessage = RandomMessage(data)
+ private val message: RandomMessage = RandomMessage(UdpMessage.authTag(), data)
- private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+ private var connector: UdpConnector = DefaultUdpConnector(address, keyPair, selfEnr)
@BeforeEach
fun setUp() {
- connector = DefaultUdpConnector(address, keyPair, selfEnr, nodeId)
+ connector = DefaultUdpConnector(address, keyPair, selfEnr)
}
@AfterEach
@@ -93,7 +92,7 @@
buffer.flip()
val messageContent = Bytes.wrapByteBuffer(buffer).slice(45)
- val message = RandomMessage.create(messageContent)
+ val message = RandomMessage.create(UdpMessage.authTag(), messageContent)
assert(message.data == data)
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
index 2060816..ac125b9 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RandomMessageTest.kt
@@ -27,12 +27,12 @@
"0xB53CCF732982B8E950836D1E02898C8B38CFDBFDF86BC65C8826506B454E14618EA73612A0F5582C130FF666"
val data = Bytes.fromHexString(expectedEncodingResult)
- val message = RandomMessage(data)
+ val message = RandomMessage(UdpMessage.authTag(), data)
val encodingResult = message.encode()
assert(encodingResult.toHexString() == expectedEncodingResult)
- val decodingResult = RandomMessage.create(encodingResult)
+ val decodingResult = RandomMessage.create(UdpMessage.authTag(), encodingResult)
assert(decodingResult.data == data)
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
similarity index 97%
rename from devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt
rename to devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
index cf0a17c..fbf2ede 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/dht/RoutingTableTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/storage/RoutingTableTest.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tuweni.devp2p.v5.dht
+package org.apache.tuweni.devp2p.v5.storage
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.SECP256K1