Merge branch 'v5-udp-transport' into v5-topics
# Conflicts:
# devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
# devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 47b3caf..72900f0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -23,6 +23,9 @@
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
+import org.apache.tuweni.devp2p.v5.topic.TicketHolder
+import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
+import org.apache.tuweni.devp2p.v5.topic.TopicTable
import java.net.InetSocketAddress
/**
@@ -114,9 +117,27 @@
fun getNodesTable(): RoutingTable
+ /**
+ * Provides node's topic table
+ *
+ * @return node's topic table
+ */
+ fun getTopicTable(): TopicTable
+
+ /**
+ * Provides node's ticket holder
+ *
+ * @return node's ticket holder
+ */
+ fun getTicketHolder(): TicketHolder
+
fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
fun getPendingMessage(authTag: Bytes): TrackingMessage
fun getNodeRecords(): ENRStorage
+
+ fun getTopicRegistrar(): TopicRegistrar
+
+ fun getSessionInitiatorKey(nodeId: Bytes): Bytes
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 9f45a04..6216138 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -37,7 +37,6 @@
import org.apache.tuweni.devp2p.v5.packet.PongMessage
import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
-import org.apache.tuweni.devp2p.v5.packet.ReqTicketMessage
import org.apache.tuweni.devp2p.v5.packet.TicketMessage
import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
import org.apache.tuweni.rlp.RLP
@@ -129,11 +128,10 @@
2 -> PongMessage.create(message)
3 -> FindNodeMessage.create(message)
4 -> NodesMessage.create(message)
- 5 -> ReqTicketMessage.create(message)
+ 5 -> RegTopicMessage.create(message)
6 -> TicketMessage.create(message)
- 7 -> RegTopicMessage.create(message)
- 8 -> RegConfirmationMessage.create(message)
- 9 -> TopicQueryMessage.create(message)
+ 7 -> RegConfirmationMessage.create(message)
+ 8 -> TopicQueryMessage.create(message)
else -> throw IllegalArgumentException("Unknown message retrieved")
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index c0a6368..6816324 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -28,6 +28,7 @@
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.ENRStorage
+import org.apache.tuweni.devp2p.v5.AuthenticationProvider
import org.apache.tuweni.devp2p.v5.MessageHandler
import org.apache.tuweni.devp2p.v5.MessageObserver
import org.apache.tuweni.devp2p.v5.PacketCodec
@@ -38,7 +39,12 @@
import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.PongMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.RegConfirmationMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.RegTopicMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.TicketMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.TopicQueryMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
+import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
@@ -48,13 +54,23 @@
import org.apache.tuweni.devp2p.v5.packet.NodesMessage
import org.apache.tuweni.devp2p.v5.packet.PingMessage
import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.topic.TicketHolder
+import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
+import org.apache.tuweni.devp2p.v5.topic.TopicTable
import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import java.net.InetSocketAddress
import java.nio.ByteBuffer
-import java.time.Duration
import java.util.logging.Logger
import kotlin.coroutines.CoroutineContext
+import java.time.Duration
class DefaultUdpConnector(
private val bindAddress: InetSocketAddress,
@@ -63,7 +79,10 @@
private val enrStorage: ENRStorage = DefaultENRStorage(),
private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
private val nodesTable: RoutingTable = RoutingTable(selfEnr),
- private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable),
+ private val topicTable: TopicTable = TopicTable(),
+ private val ticketHolder: TicketHolder = TicketHolder(),
+ private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, nodesTable),
+ private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable, nodeId, authenticationProvider),
private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf(),
private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
@@ -78,6 +97,12 @@
private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
+ private val regConfirmationMessageHandler: MessageHandler<RegConfirmationMessage> = RegConfirmationMessageHandler()
+ private val regTopicMessageHandler: MessageHandler<RegTopicMessage> = RegTopicMessageHandler()
+ private val ticketMessageHandler: MessageHandler<TicketMessage> = TicketMessageHandler()
+ private val topicQueryMessageHandler: MessageHandler<TopicQueryMessage> = TopicQueryMessageHandler()
+
+ private val topicRegistrar = TopicRegistrar(coroutineContext, this)
private val askedNodes: MutableList<Bytes> = mutableListOf()
@@ -109,7 +134,7 @@
override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
- ?: throw IllegalArgumentException("Pending message not found")
+ ?: throw IllegalArgumentException("Pending message not found")
override fun start() {
@@ -160,6 +185,12 @@
return result
}
+ override fun getTopicTable(): TopicTable = topicTable
+
+ override fun getTicketHolder(): TicketHolder = ticketHolder
+
+ override fun getTopicRegistrar(): TopicRegistrar = topicRegistrar
+
override fun getAwaitingPongRecord(nodeId: Bytes): Bytes? {
val nodeIdHex = nodeId.toHexString()
val result = pings.getIfPresent(nodeIdHex)
@@ -167,6 +198,11 @@
return result
}
+ override fun getSessionInitiatorKey(nodeId: Bytes): Bytes {
+ return authenticationProvider.findSessionKey(nodeId.toHexString())?.initiatorKey
+ ?: throw IllegalArgumentException("Session key not found.")
+ }
+
// Lookup nodes
private fun lookupNodes() = launch {
while (true) {
@@ -183,7 +219,7 @@
private fun lookupInternal(nearest: List<Bytes>) {
val targetNode = if (nearest.isNotEmpty()) nearest.random() else Bytes.random(32)
val distance = getNodesTable().distanceToSelf(targetNode)
- for(target in nearest.take(3)) {
+ for (target in nearest.take(3)) {
val enr = EthereumNodeRecord.fromRLP(target)
val message = FindNodeMessage(distance = distance)
val address = InetSocketAddress(enr.ip(), enr.udp())
@@ -217,6 +253,10 @@
is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is PingMessage -> pingMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is RegTopicMessage -> regTopicMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is RegConfirmationMessage -> regConfirmationMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is TicketMessage -> ticketMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+ is TopicQueryMessage -> topicQueryMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
messageListeners.forEach { it.observe(message) }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt
new file mode 100644
index 0000000..5b38d82
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import java.net.InetSocketAddress
+
+
+class RegConfirmationMessageHandler : MessageHandler<RegConfirmationMessage> {
+
+ override fun handle(
+ message: RegConfirmationMessage,
+ address: InetSocketAddress,
+ srcNodeId: Bytes,
+ connector: UdpConnector
+ ) {
+ val ticketHolder = connector.getTicketHolder()
+ ticketHolder.removeTicket(message.requestId)
+ connector.getTopicRegistrar().registerTopic(message.topic, true)
+ }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt
new file mode 100644
index 0000000..2365446
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.DiscoveryService.Companion.CURRENT_TIME_SUPPLIER
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.topic.Ticket
+import org.apache.tuweni.devp2p.v5.topic.Topic
+import java.net.InetSocketAddress
+
+class RegTopicMessageHandler : MessageHandler<RegTopicMessage> {
+
+ private val now: () -> Long = CURRENT_TIME_SUPPLIER
+
+
+ override fun handle(message: RegTopicMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ val topic = Topic(message.topic.toHexString())
+ val key = connector.getSessionInitiatorKey(srcNodeId)
+
+ val existingTicket = if (!message.ticket.isEmpty) {
+ val ticket = Ticket.decrypt(message.ticket, key)
+ ticket.validate(srcNodeId, address.address, now(), message.topic)
+ ticket
+ } else null
+
+ // Create new ticket
+ val waitTime = connector.getTopicTable().put(topic, message.nodeRecord)
+ val cumTime = (existingTicket?.cumTime ?: waitTime) + waitTime
+ val ticket = Ticket(message.topic, srcNodeId, address.address, now(), waitTime, cumTime)
+ val encryptedTicket = ticket.encrypt(key)
+
+ // Send ticket
+ val response = TicketMessage(message.requestId, encryptedTicket, waitTime)
+ connector.send(address, response, srcNodeId)
+
+ // Send confirmation if topic was placed
+ if (waitTime == 0L) {
+ val confirmation = RegConfirmationMessage(message.requestId, message.topic)
+ connector.send(address, confirmation, srcNodeId)
+ }
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt
new file mode 100644
index 0000000..11d3d3f
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.topic.Ticket
+import java.net.InetSocketAddress
+
+class TicketMessageHandler : MessageHandler<TicketMessage> {
+
+ override fun handle(message: TicketMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+ val ticketHolder = connector.getTicketHolder()
+ ticketHolder.putTicket(message.requestId, message.ticket)
+
+ if (message.waitTime != 0L) {
+ val key = connector.getSessionInitiatorKey(srcNodeId)
+ val ticket = Ticket.decrypt(message.ticket, key)
+ connector.getTopicRegistrar().delayRegTopic(message.requestId, ticket.topic, message.waitTime)
+ }
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt
new file mode 100644
index 0000000..7f2e432
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
+import org.apache.tuweni.devp2p.v5.topic.Topic
+import java.net.InetSocketAddress
+
+class TopicQueryMessageHandler : MessageHandler<TopicQueryMessage> {
+
+ override fun handle(
+ message: TopicQueryMessage,
+ address: InetSocketAddress,
+ srcNodeId: Bytes,
+ connector: UdpConnector
+ ) {
+ val topicTable = connector.getTopicTable()
+ val nodes = topicTable.getNodes(Topic(message.topic.toHexString()))
+
+ var caret = 0
+ while (caret < nodes.size) {
+ val response = NodesMessage(message.requestId, nodes.size, nodes.subList(caret, caret + MAX_NODES_IN_RESPONSE))
+ connector.send(address, response, srcNodeId)
+ caret += MAX_NODES_IN_RESPONSE
+ }
+ }
+
+ companion object {
+ private const val MAX_NODES_IN_RESPONSE: Int = 16
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
index bd643dd..b22ca9a 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
@@ -21,17 +21,17 @@
class RegConfirmationMessage(
val requestId: Bytes = UdpMessage.requestId(),
- val registered: Boolean = true
+ val topic: Bytes
) : UdpMessage() {
- private val encodedMessageType: Bytes = Bytes.fromHexString("0x08")
+ private val encodedMessageType: Bytes = Bytes.fromHexString("0x07")
override fun getMessageType(): Bytes = encodedMessageType
override fun encode(): Bytes {
return RLP.encodeList { writer ->
writer.writeValue(requestId)
- writer.writeByte(if (registered) 1 else 0)
+ writer.writeValue(topic)
}
}
@@ -39,8 +39,8 @@
fun create(content: Bytes): RegConfirmationMessage {
return RLP.decodeList(content) { reader ->
val requestId = reader.readValue()
- val registered = (reader.readByte() == 1.toByte())
- return@decodeList RegConfirmationMessage(requestId, registered)
+ val topic = reader.readValue()
+ return@decodeList RegConfirmationMessage(requestId, topic)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
index afe2c23..aae05a4 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
@@ -21,19 +21,21 @@
class RegTopicMessage(
val requestId: Bytes = UdpMessage.requestId(),
- val ticket: Bytes,
- val nodeRecord: Bytes
+ val nodeRecord: Bytes,
+ val topic: Bytes,
+ val ticket: Bytes
) : UdpMessage() {
- private val encodedMessageType: Bytes = Bytes.fromHexString("0x07")
+ private val encodedMessageType: Bytes = Bytes.fromHexString("0x05")
override fun getMessageType(): Bytes = encodedMessageType
override fun encode(): Bytes {
return RLP.encodeList { writer ->
writer.writeValue(requestId)
- writer.writeValue(ticket)
writer.writeValue(nodeRecord)
+ writer.writeValue(topic)
+ writer.writeValue(ticket)
}
}
@@ -41,9 +43,10 @@
fun create(content: Bytes): RegTopicMessage {
return RLP.decodeList(content) { reader ->
val requestId = reader.readValue()
- val ticket = reader.readValue()
val nodeRecord = reader.readValue()
- return@decodeList RegTopicMessage(requestId, ticket, nodeRecord)
+ val topic = reader.readValue()
+ val ticket = reader.readValue()
+ return@decodeList RegTopicMessage(requestId, nodeRecord, topic, ticket)
}
}
}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt
deleted file mode 100644
index 0381a85..0000000
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5.packet
-
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.rlp.RLP
-
-class ReqTicketMessage(
- val requestId: Bytes = UdpMessage.requestId(),
- val topic: Bytes
-) : UdpMessage() {
-
- private val encodedMessageType: Bytes = Bytes.fromHexString("0x05")
-
- override fun getMessageType(): Bytes = encodedMessageType
-
- override fun encode(): Bytes {
- return RLP.encodeList { writer ->
- writer.writeValue(requestId)
- writer.writeValue(topic)
- }
- }
-
- companion object {
- fun create(content: Bytes): ReqTicketMessage {
- return RLP.decodeList(content) { reader ->
- val requestId = reader.readValue()
- val topic = reader.readValue()
- return@decodeList ReqTicketMessage(requestId, topic)
- }
- }
- }
-}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
index 14736ff..d543f01 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
@@ -24,7 +24,7 @@
val topic: Bytes
) : UdpMessage() {
- private val encodedMessageType: Bytes = Bytes.fromHexString("0x09")
+ private val encodedMessageType: Bytes = Bytes.fromHexString("0x08")
override fun getMessageType(): Bytes = encodedMessageType
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt
new file mode 100644
index 0000000..8d67374
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
+import org.apache.tuweni.rlp.RLP
+import java.net.InetAddress
+
+data class Ticket(
+ val topic: Bytes,
+ val srcNodeId: Bytes,
+ val srcIp: InetAddress,
+ val requestTime: Long,
+ val waitTime: Long,
+ val cumTime: Long
+) {
+
+ fun encode(): Bytes {
+ return RLP.encodeList { writer ->
+ writer.writeValue(topic)
+ writer.writeValue(srcNodeId)
+ writer.writeValue(Bytes.wrap(srcIp.address))
+ writer.writeLong(requestTime)
+ writer.writeLong(waitTime)
+ writer.writeLong(cumTime)
+ }
+ }
+
+ fun encrypt(key: Bytes): Bytes {
+ val ticketBytes = encode()
+ return AES128GCM.encrypt(key, Bytes.wrap(ByteArray(ZERO_NONCE_SIZE)), ticketBytes, Bytes.EMPTY)
+ }
+
+ fun validate(
+ srcNodeId: Bytes,
+ srcIp: InetAddress,
+ now: Long,
+ topic: Bytes
+ ) {
+ require(this.srcNodeId == srcNodeId) { TICKET_INVALID_MSG }
+ require(this.srcIp == srcIp) { TICKET_INVALID_MSG }
+ require(this.topic == topic) { TICKET_INVALID_MSG }
+ val windowStart = this.requestTime + this.waitTime
+ require(now >= windowStart && now <= windowStart + TIME_WINDOW_MS) { TICKET_INVALID_MSG }
+ }
+
+ companion object {
+ private const val ZERO_NONCE_SIZE: Int = 12
+ internal const val TIME_WINDOW_MS: Int = 10000 // 10 seconds
+ internal const val TICKET_INVALID_MSG = "Ticket is invalid"
+
+ fun create(content: Bytes): Ticket {
+ return RLP.decodeList(content) { reader ->
+ val topic = reader.readValue()
+ val srcNodeId = reader.readValue()
+ val srcIp = InetAddress.getByAddress(reader.readValue().toArray())
+ val requestTime = reader.readLong()
+ val waitTime = reader.readLong()
+ val cumTime = reader.readLong()
+ return@decodeList Ticket(topic, srcNodeId, srcIp, requestTime, waitTime, cumTime)
+ }
+ }
+
+ fun decrypt(encrypted: Bytes, key: Bytes): Ticket {
+ val decrypted = AES128GCM.decrypt(encrypted, key, Bytes.EMPTY)
+ return create(decrypted)
+ }
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt
new file mode 100644
index 0000000..7f41f08
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+
+class TicketHolder {
+
+ private val tickets: MutableMap<Bytes, Bytes> = hashMapOf() // requestId to ticket
+
+
+ fun putTicket(requestId: Bytes, ticket: Bytes) {
+ tickets[requestId] = ticket
+ }
+
+ fun getTicket(requestId: Bytes): Bytes =
+ tickets[requestId] ?: throw IllegalArgumentException("Ticket not found.")
+
+ fun removeTicket(requestId: Bytes): Bytes? = tickets.remove(requestId)
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt
new file mode 100644
index 0000000..5403b8d
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+data class Topic(
+ val content: String
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt
new file mode 100644
index 0000000..e5bb843
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import java.net.InetSocketAddress
+import kotlin.coroutines.CoroutineContext
+
+class TopicRegistrar(
+ override val coroutineContext: CoroutineContext = Dispatchers.IO,
+ private val connector: DefaultUdpConnector
+) : CoroutineScope {
+
+ fun delayRegTopic(requestId: Bytes, topic: Bytes, waitTime: Long) {
+ launch {
+ delay(waitTime)
+
+ val ticket = connector.getTicketHolder().getTicket(requestId)
+ sendRegTopic(topic, ticket, requestId)
+ }
+ }
+
+ fun registerTopic(topic: Bytes, withDelay: Boolean = false) {
+ launch {
+ if (withDelay) {
+ delay(SEND_REGTOPIC_DELAY_MS)
+ }
+
+ sendRegTopic(topic)
+ }
+ }
+
+ private fun sendRegTopic(topic: Bytes, ticket: Bytes = Bytes.EMPTY, requestId: Bytes = UdpMessage.requestId()) {
+ val nodeEnr = connector.getEnrBytes()
+ val message = RegTopicMessage(requestId, nodeEnr, topic, ticket)
+
+ val receivers = connector.getNodesTable().nodesOfDistance(1) //todo radius
+ receivers.forEach { rlp ->
+ val receiver = EthereumNodeRecord.fromRLP(rlp)
+ val address = InetSocketAddress(receiver.ip(), receiver.udp())
+ val nodeId = Hash.sha2_256(rlp)
+ connector.send(address, message, nodeId)
+ }
+ }
+
+ companion object {
+ private const val SEND_REGTOPIC_DELAY_MS = 15 * 60 * 1000L // 15 min
+ }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt
new file mode 100644
index 0000000..4ce963a
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import com.google.common.cache.Cache
+import com.google.common.cache.CacheBuilder
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.DiscoveryService
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import java.util.concurrent.TimeUnit
+
+class TopicTable(
+ private val tableCapacity: Int = MAX_TABLE_CAPACITY,
+ private val queueCapacity: Int = MAX_ENTRIES_PER_TOPIC
+) {
+
+ private val timeSupplier: () -> Long = DiscoveryService.CURRENT_TIME_SUPPLIER
+ private val table: HashMap<Topic, Cache<String, TargetAd>> = HashMap(tableCapacity)
+
+
+ init {
+ require(tableCapacity > 0) { "Table capacity value must be positive" }
+ require(queueCapacity > 0) { "Queue capacity value must be positive" }
+ }
+
+ fun getNodes(topic: Topic): List<Bytes> {
+ val values = table[topic]
+ return values?.let { values.asMap().values.map { it.enr } } ?: emptyList()
+ }
+
+ /**
+ * Puts a topic in a table
+ *
+ * @return wait time for registrant node (0 is topic was putted immediately, -1 in case of error)
+ */
+ @Synchronized
+ fun put(topic: Topic, enr: Bytes): Long {
+ gcTable()
+
+ val topicQueue = table[topic]
+ val nodeId = Hash.sha2_256(enr).toHexString()
+
+ if (null != topicQueue) {
+ if (topicQueue.size() < queueCapacity) {
+ topicQueue.put(nodeId, TargetAd(timeSupplier(), enr))
+ return 0 // put immediately
+ } else {
+ // Queue if full (wait time = target-ad-lifetime - oldest ad lifetime in queue)
+ return TARGET_AD_LIFETIME_MS - (timeSupplier() - topicQueue.oldest().regTime)
+ }
+ }
+
+ if (table.size < tableCapacity) {
+ table[topic] = createNewQueue().apply { put(nodeId, TargetAd(timeSupplier(), enr)) }
+ return 0 // put immediately
+ } else {
+ //table is full (wait time = target-ad-lifetime - oldest in table of youngest in queue)
+ val oldestInTable = table.entries.map { it.value.youngest().regTime }.min() ?: -1
+ return TARGET_AD_LIFETIME_MS - (timeSupplier() - oldestInTable)
+ }
+
+ }
+
+ fun contains(topic: Topic): Boolean = table.containsKey(topic)
+
+ fun isEmpty(): Boolean = table.isEmpty()
+
+ fun clear() = table.clear()
+
+ private fun createNewQueue(): Cache<String, TargetAd> {
+ return CacheBuilder.newBuilder()
+ .expireAfterWrite(TARGET_AD_LIFETIME_MS, TimeUnit.MILLISECONDS)
+ .initialCapacity(queueCapacity)
+ .build()
+ }
+
+ private fun gcTable() {
+ table.entries.removeIf { it.value.size() == 0L }
+ }
+
+ private fun Cache<String, TargetAd>.oldest(): TargetAd {
+ return asMap().values.minBy { it.regTime } ?: throw IllegalArgumentException(QUEUE_EMPTY_MSG)
+ }
+
+ private fun Cache<String, TargetAd>.youngest(): TargetAd {
+ return asMap().values.maxBy { it.regTime } ?: throw IllegalArgumentException(QUEUE_EMPTY_MSG)
+ }
+
+ companion object {
+ internal const val MAX_ENTRIES_PER_TOPIC: Int = 100
+ private const val MAX_TABLE_CAPACITY: Int = 500
+ private const val TARGET_AD_LIFETIME_MS: Long = 15 * 60 * 1000 // 15 min
+
+ private const val QUEUE_EMPTY_MSG = "Queue is empty."
+ }
+}
+
+class TargetAd(
+ val regTime: Long,
+ val enr: Bytes
+)
+
+
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
index 65f61a6..f879ef8 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
@@ -24,20 +24,20 @@
@Test
fun encodeCreatesValidBytesSequence() {
val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
- val message = RegConfirmationMessage(requestId, false)
+ val message = RegConfirmationMessage(requestId, Bytes.random(32))
val encodingResult = message.encode()
val decodingResult = RegConfirmationMessage.create(encodingResult)
assert(decodingResult.requestId == requestId)
- assert(decodingResult.registered == message.registered)
+ assert(decodingResult.topic == message.topic)
}
@Test
fun getMessageTypeHasValidIndex() {
- val message = RegConfirmationMessage()
+ val message = RegConfirmationMessage(topic = Bytes.random(32))
- assert(8 == message.getMessageType().toInt())
+ assert(7 == message.getMessageType().toInt())
}
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
index c33b43b..c15f5d6 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
@@ -24,7 +24,7 @@
@Test
fun encodeCreatesValidBytesSequence() {
val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
- val message = RegTopicMessage(requestId, Bytes.random(32), Bytes.random(32))
+ val message = RegTopicMessage(requestId, Bytes.random(32), Bytes.random(32), Bytes.random(16))
val encodingResult = message.encode()
@@ -37,8 +37,8 @@
@Test
fun getMessageTypeHasValidIndex() {
- val message = RegTopicMessage(ticket = Bytes.random(32), nodeRecord = Bytes.random(32))
+ val message = RegTopicMessage(ticket = Bytes.random(32), nodeRecord = Bytes.random(32), topic = Bytes.random(16))
- assert(7 == message.getMessageType().toInt())
+ assert(5 == message.getMessageType().toInt())
}
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt
deleted file mode 100644
index a43a328..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5.packet
-
-import org.apache.tuweni.bytes.Bytes
-import org.junit.jupiter.api.Test
-
-class ReqTicketMessageTest {
-
- @Test
- fun encodeCreatesValidBytesSequence() {
- val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
- val message = ReqTicketMessage(requestId, Bytes.random(32))
-
- val encodingResult = message.encode()
-
- val decodingResult = ReqTicketMessage.create(encodingResult)
-
- assert(decodingResult.requestId == requestId)
- assert(decodingResult.topic == message.topic)
- }
-
- @Test
- fun getMessageTypeHasValidIndex() {
- val message = ReqTicketMessage(topic = Bytes.random(32))
-
- assert(5 == message.getMessageType().toInt())
- }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
index 36d66a8..9c1c541 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
@@ -38,6 +38,6 @@
fun getMessageTypeHasValidIndex() {
val message = TopicQueryMessage(topic = Bytes.random(32))
- assert(9 == message.getMessageType().toInt())
+ assert(8 == message.getMessageType().toInt())
}
}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt
new file mode 100644
index 0000000..181a08f
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class TopicTableTest {
+ private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
+ private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+
+ private val topicTable = TopicTable(TABLE_CAPACITY, QUEUE_CAPACITY)
+
+ @Test
+ fun putAddNodeToEmptyQueueImmediately() {
+ val waitTime = topicTable.put(Topic("A"), enr)
+
+ assert(!topicTable.isEmpty())
+ assert(waitTime == 0L)
+ }
+
+ @Test
+ fun putAddNodeToNotEmptyQueueShouldReturnWaitingTime() {
+ val topic = Topic("A")
+ topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+ topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+
+ val waitTime = topicTable.put(topic, enr)
+
+ assert(waitTime > 0L)
+ }
+
+ @Test
+ fun putAddNodeToNotEmptyTableShouldReturnWaitingTime() {
+ topicTable.put(Topic("A"), enr)
+ topicTable.put(Topic("B"), enr)
+
+ val waitTime = topicTable.put(Topic("C"), enr)
+
+ assert(waitTime > 0L)
+ }
+
+ @Test
+ fun getNodesReturnNodesThatProvidesTopic() {
+ val topic = Topic("A")
+ topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+ topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+
+ val nodes = topicTable.getNodes(topic)
+
+ assert(nodes.isNotEmpty())
+ assert(nodes.size == 2)
+ }
+
+ @Test
+ fun contains() {
+ val topic = Topic("A")
+ topicTable.put(topic, enr)
+
+ val containsTrue = topicTable.contains(topic)
+ assert(containsTrue)
+
+ val containsFalse = topicTable.contains(Topic("B"))
+ assert(!containsFalse)
+ }
+
+ @AfterEach
+ fun tearDown() {
+ topicTable.clear()
+ }
+
+ companion object {
+ private const val TABLE_CAPACITY = 2
+ private const val QUEUE_CAPACITY = 2
+ }
+
+}