Merge branch 'v5-udp-transport' into v5-topics

# Conflicts:
#	devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
#	devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
index 47b3caf..72900f0 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/UdpConnector.kt
@@ -23,6 +23,9 @@
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
 import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import org.apache.tuweni.devp2p.v5.misc.TrackingMessage
+import org.apache.tuweni.devp2p.v5.topic.TicketHolder
+import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
+import org.apache.tuweni.devp2p.v5.topic.TopicTable
 import java.net.InetSocketAddress
 
 /**
@@ -114,9 +117,27 @@
 
   fun getNodesTable(): RoutingTable
 
+  /**
+   * Provides node's topic table
+   *
+   * @return node's topic table
+   */
+  fun getTopicTable(): TopicTable
+
+  /**
+   * Provides node's ticket holder
+   *
+   * @return node's ticket holder
+   */
+  fun getTicketHolder(): TicketHolder
+
   fun getAwaitingPongRecord(nodeId: Bytes): Bytes?
 
   fun getPendingMessage(authTag: Bytes): TrackingMessage
 
   fun getNodeRecords(): ENRStorage
+
+  fun getTopicRegistrar(): TopicRegistrar
+
+  fun getSessionInitiatorKey(nodeId: Bytes): Bytes
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
index 9f45a04..6216138 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultPacketCodec.kt
@@ -37,7 +37,6 @@
 import org.apache.tuweni.devp2p.v5.packet.PongMessage
 import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
 import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
-import org.apache.tuweni.devp2p.v5.packet.ReqTicketMessage
 import org.apache.tuweni.devp2p.v5.packet.TicketMessage
 import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
 import org.apache.tuweni.rlp.RLP
@@ -129,11 +128,10 @@
       2 -> PongMessage.create(message)
       3 -> FindNodeMessage.create(message)
       4 -> NodesMessage.create(message)
-      5 -> ReqTicketMessage.create(message)
+      5 -> RegTopicMessage.create(message)
       6 -> TicketMessage.create(message)
-      7 -> RegTopicMessage.create(message)
-      8 -> RegConfirmationMessage.create(message)
-      9 -> TopicQueryMessage.create(message)
+      7 -> RegConfirmationMessage.create(message)
+      8 -> TopicQueryMessage.create(message)
       else -> throw IllegalArgumentException("Unknown message retrieved")
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
index c0a6368..6816324 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnector.kt
@@ -28,6 +28,7 @@
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.EthereumNodeRecord
 import org.apache.tuweni.devp2p.v5.ENRStorage
+import org.apache.tuweni.devp2p.v5.AuthenticationProvider
 import org.apache.tuweni.devp2p.v5.MessageHandler
 import org.apache.tuweni.devp2p.v5.MessageObserver
 import org.apache.tuweni.devp2p.v5.PacketCodec
@@ -38,7 +39,12 @@
 import org.apache.tuweni.devp2p.v5.internal.handler.PingMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.PongMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.RegConfirmationMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.RegTopicMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.TicketMessageHandler
+import org.apache.tuweni.devp2p.v5.internal.handler.TopicQueryMessageHandler
 import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
+import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
 import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
 import org.apache.tuweni.devp2p.v5.packet.RandomMessage
 import org.apache.tuweni.devp2p.v5.packet.UdpMessage
@@ -48,13 +54,23 @@
 import org.apache.tuweni.devp2p.v5.packet.NodesMessage
 import org.apache.tuweni.devp2p.v5.packet.PingMessage
 import org.apache.tuweni.devp2p.v5.packet.PongMessage
+import org.apache.tuweni.devp2p.v5.packet.RandomMessage
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
+import org.apache.tuweni.devp2p.v5.topic.TicketHolder
+import org.apache.tuweni.devp2p.v5.topic.TopicRegistrar
+import org.apache.tuweni.devp2p.v5.topic.TopicTable
 import org.apache.tuweni.devp2p.v5.storage.DefaultENRStorage
 import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
 import java.net.InetSocketAddress
 import java.nio.ByteBuffer
-import java.time.Duration
 import java.util.logging.Logger
 import kotlin.coroutines.CoroutineContext
+import java.time.Duration
 
 class DefaultUdpConnector(
   private val bindAddress: InetSocketAddress,
@@ -63,7 +79,10 @@
   private val enrStorage: ENRStorage = DefaultENRStorage(),
   private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
   private val nodesTable: RoutingTable = RoutingTable(selfEnr),
-  private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable),
+  private val topicTable: TopicTable = TopicTable(),
+  private val ticketHolder: TicketHolder = TicketHolder(),
+  private val authenticationProvider: AuthenticationProvider = DefaultAuthenticationProvider(keyPair, nodesTable),
+  private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, nodesTable, nodeId, authenticationProvider),
   private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf(),
   private val selfNodeRecord: EthereumNodeRecord = EthereumNodeRecord.fromRLP(selfEnr),
   private val messageListeners: MutableList<MessageObserver> = mutableListOf(),
@@ -78,6 +97,12 @@
   private val nodesMessageHandler: MessageHandler<NodesMessage> = NodesMessageHandler()
   private val pingMessageHandler: MessageHandler<PingMessage> = PingMessageHandler()
   private val pongMessageHandler: MessageHandler<PongMessage> = PongMessageHandler()
+  private val regConfirmationMessageHandler: MessageHandler<RegConfirmationMessage> = RegConfirmationMessageHandler()
+  private val regTopicMessageHandler: MessageHandler<RegTopicMessage> = RegTopicMessageHandler()
+  private val ticketMessageHandler: MessageHandler<TicketMessage> = TicketMessageHandler()
+  private val topicQueryMessageHandler: MessageHandler<TopicQueryMessage> = TopicQueryMessageHandler()
+
+  private val topicRegistrar = TopicRegistrar(coroutineContext, this)
 
   private val askedNodes: MutableList<Bytes> = mutableListOf()
 
@@ -109,7 +134,7 @@
   override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
 
   override fun getPendingMessage(authTag: Bytes): TrackingMessage = pendingMessages.getIfPresent(authTag.toHexString())
-      ?: throw IllegalArgumentException("Pending message not found")
+    ?: throw IllegalArgumentException("Pending message not found")
 
 
   override fun start() {
@@ -160,6 +185,12 @@
     return result
   }
 
+  override fun getTopicTable(): TopicTable = topicTable
+
+  override fun getTicketHolder(): TicketHolder = ticketHolder
+
+  override fun getTopicRegistrar(): TopicRegistrar = topicRegistrar
+
   override fun getAwaitingPongRecord(nodeId: Bytes): Bytes? {
     val nodeIdHex = nodeId.toHexString()
     val result = pings.getIfPresent(nodeIdHex)
@@ -167,6 +198,11 @@
     return result
   }
 
+  override fun getSessionInitiatorKey(nodeId: Bytes): Bytes {
+    return authenticationProvider.findSessionKey(nodeId.toHexString())?.initiatorKey
+      ?: throw IllegalArgumentException("Session key not found.")
+  }
+
   // Lookup nodes
   private fun lookupNodes() = launch {
     while (true) {
@@ -183,7 +219,7 @@
   private fun lookupInternal(nearest: List<Bytes>) {
     val targetNode = if (nearest.isNotEmpty()) nearest.random() else Bytes.random(32)
     val distance = getNodesTable().distanceToSelf(targetNode)
-    for(target in nearest.take(3)) {
+    for (target in nearest.take(3)) {
       val enr = EthereumNodeRecord.fromRLP(target)
       val message = FindNodeMessage(distance = distance)
       val address = InetSocketAddress(enr.ip(), enr.udp())
@@ -217,6 +253,10 @@
       is NodesMessage -> nodesMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       is PingMessage -> pingMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       is PongMessage -> pongMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is RegTopicMessage -> regTopicMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is RegConfirmationMessage -> regConfirmationMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is TicketMessage -> ticketMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
+      is TopicQueryMessage -> topicQueryMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
       else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
     }
     messageListeners.forEach { it.observe(message) }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt
new file mode 100644
index 0000000..5b38d82
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegConfirmationMessageHandler.kt
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import java.net.InetSocketAddress
+
+
+class RegConfirmationMessageHandler : MessageHandler<RegConfirmationMessage> {
+
+  override fun handle(
+    message: RegConfirmationMessage,
+    address: InetSocketAddress,
+    srcNodeId: Bytes,
+    connector: UdpConnector
+  ) {
+    val ticketHolder = connector.getTicketHolder()
+    ticketHolder.removeTicket(message.requestId)
+    connector.getTopicRegistrar().registerTopic(message.topic, true)
+  }
+
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt
new file mode 100644
index 0000000..2365446
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/RegTopicMessageHandler.kt
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.DiscoveryService.Companion.CURRENT_TIME_SUPPLIER
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegConfirmationMessage
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.topic.Ticket
+import org.apache.tuweni.devp2p.v5.topic.Topic
+import java.net.InetSocketAddress
+
+class RegTopicMessageHandler : MessageHandler<RegTopicMessage> {
+
+  private val now: () -> Long = CURRENT_TIME_SUPPLIER
+
+
+  override fun handle(message: RegTopicMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    val topic = Topic(message.topic.toHexString())
+    val key = connector.getSessionInitiatorKey(srcNodeId)
+
+    val existingTicket = if (!message.ticket.isEmpty) {
+      val ticket = Ticket.decrypt(message.ticket, key)
+      ticket.validate(srcNodeId, address.address, now(), message.topic)
+      ticket
+    } else null
+
+    // Create new ticket
+    val waitTime = connector.getTopicTable().put(topic, message.nodeRecord)
+    val cumTime = (existingTicket?.cumTime ?: waitTime) + waitTime
+    val ticket = Ticket(message.topic, srcNodeId, address.address, now(), waitTime, cumTime)
+    val encryptedTicket = ticket.encrypt(key)
+
+    // Send ticket
+    val response = TicketMessage(message.requestId, encryptedTicket, waitTime)
+    connector.send(address, response, srcNodeId)
+
+    // Send confirmation if topic was placed
+    if (waitTime == 0L) {
+      val confirmation = RegConfirmationMessage(message.requestId, message.topic)
+      connector.send(address, confirmation, srcNodeId)
+    }
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt
new file mode 100644
index 0000000..11d3d3f
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TicketMessageHandler.kt
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.TicketMessage
+import org.apache.tuweni.devp2p.v5.topic.Ticket
+import java.net.InetSocketAddress
+
+class TicketMessageHandler : MessageHandler<TicketMessage> {
+
+  override fun handle(message: TicketMessage, address: InetSocketAddress, srcNodeId: Bytes, connector: UdpConnector) {
+    val ticketHolder = connector.getTicketHolder()
+    ticketHolder.putTicket(message.requestId, message.ticket)
+
+    if (message.waitTime != 0L) {
+      val key = connector.getSessionInitiatorKey(srcNodeId)
+      val ticket = Ticket.decrypt(message.ticket, key)
+      connector.getTopicRegistrar().delayRegTopic(message.requestId, ticket.topic, message.waitTime)
+    }
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt
new file mode 100644
index 0000000..7f2e432
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/internal/handler/TopicQueryMessageHandler.kt
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.internal.handler
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.MessageHandler
+import org.apache.tuweni.devp2p.v5.UdpConnector
+import org.apache.tuweni.devp2p.v5.packet.NodesMessage
+import org.apache.tuweni.devp2p.v5.packet.TopicQueryMessage
+import org.apache.tuweni.devp2p.v5.topic.Topic
+import java.net.InetSocketAddress
+
+class TopicQueryMessageHandler : MessageHandler<TopicQueryMessage> {
+
+  override fun handle(
+    message: TopicQueryMessage,
+    address: InetSocketAddress,
+    srcNodeId: Bytes,
+    connector: UdpConnector
+  ) {
+    val topicTable = connector.getTopicTable()
+    val nodes = topicTable.getNodes(Topic(message.topic.toHexString()))
+
+    var caret = 0
+    while (caret < nodes.size) {
+      val response = NodesMessage(message.requestId, nodes.size, nodes.subList(caret, caret + MAX_NODES_IN_RESPONSE))
+      connector.send(address, response, srcNodeId)
+      caret += MAX_NODES_IN_RESPONSE
+    }
+  }
+
+  companion object {
+    private const val MAX_NODES_IN_RESPONSE: Int = 16
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
index bd643dd..b22ca9a 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessage.kt
@@ -21,17 +21,17 @@
 
 class RegConfirmationMessage(
   val requestId: Bytes = UdpMessage.requestId(),
-  val registered: Boolean = true
+  val topic: Bytes
 ) : UdpMessage() {
 
-  private val encodedMessageType: Bytes = Bytes.fromHexString("0x08")
+  private val encodedMessageType: Bytes = Bytes.fromHexString("0x07")
 
   override fun getMessageType(): Bytes = encodedMessageType
 
   override fun encode(): Bytes {
     return RLP.encodeList { writer ->
       writer.writeValue(requestId)
-      writer.writeByte(if (registered) 1 else 0)
+      writer.writeValue(topic)
     }
   }
 
@@ -39,8 +39,8 @@
     fun create(content: Bytes): RegConfirmationMessage {
       return RLP.decodeList(content) { reader ->
         val requestId = reader.readValue()
-        val registered = (reader.readByte() == 1.toByte())
-        return@decodeList RegConfirmationMessage(requestId, registered)
+        val topic = reader.readValue()
+        return@decodeList RegConfirmationMessage(requestId, topic)
       }
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
index afe2c23..aae05a4 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessage.kt
@@ -21,19 +21,21 @@
 
 class RegTopicMessage(
   val requestId: Bytes = UdpMessage.requestId(),
-  val ticket: Bytes,
-  val nodeRecord: Bytes
+  val nodeRecord: Bytes,
+  val topic: Bytes,
+  val ticket: Bytes
 ) : UdpMessage() {
 
-  private val encodedMessageType: Bytes = Bytes.fromHexString("0x07")
+  private val encodedMessageType: Bytes = Bytes.fromHexString("0x05")
 
   override fun getMessageType(): Bytes = encodedMessageType
 
   override fun encode(): Bytes {
     return RLP.encodeList { writer ->
       writer.writeValue(requestId)
-      writer.writeValue(ticket)
       writer.writeValue(nodeRecord)
+      writer.writeValue(topic)
+      writer.writeValue(ticket)
     }
   }
 
@@ -41,9 +43,10 @@
     fun create(content: Bytes): RegTopicMessage {
       return RLP.decodeList(content) { reader ->
         val requestId = reader.readValue()
-        val ticket = reader.readValue()
         val nodeRecord = reader.readValue()
-        return@decodeList RegTopicMessage(requestId, ticket, nodeRecord)
+        val topic = reader.readValue()
+        val ticket = reader.readValue()
+        return@decodeList RegTopicMessage(requestId, nodeRecord, topic, ticket)
       }
     }
   }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt
deleted file mode 100644
index 0381a85..0000000
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessage.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5.packet
-
-import org.apache.tuweni.bytes.Bytes
-import org.apache.tuweni.rlp.RLP
-
-class ReqTicketMessage(
-  val requestId: Bytes = UdpMessage.requestId(),
-  val topic: Bytes
-) : UdpMessage() {
-
-  private val encodedMessageType: Bytes = Bytes.fromHexString("0x05")
-
-  override fun getMessageType(): Bytes = encodedMessageType
-
-  override fun encode(): Bytes {
-    return RLP.encodeList { writer ->
-      writer.writeValue(requestId)
-      writer.writeValue(topic)
-    }
-  }
-
-  companion object {
-    fun create(content: Bytes): ReqTicketMessage {
-      return RLP.decodeList(content) { reader ->
-        val requestId = reader.readValue()
-        val topic = reader.readValue()
-        return@decodeList ReqTicketMessage(requestId, topic)
-      }
-    }
-  }
-}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
index 14736ff..d543f01 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessage.kt
@@ -24,7 +24,7 @@
   val topic: Bytes
 ) : UdpMessage() {
 
-  private val encodedMessageType: Bytes = Bytes.fromHexString("0x09")
+  private val encodedMessageType: Bytes = Bytes.fromHexString("0x08")
 
   override fun getMessageType(): Bytes = encodedMessageType
 
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt
new file mode 100644
index 0000000..8d67374
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Ticket.kt
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.devp2p.v5.encrypt.AES128GCM
+import org.apache.tuweni.rlp.RLP
+import java.net.InetAddress
+
+data class Ticket(
+  val topic: Bytes,
+  val srcNodeId: Bytes,
+  val srcIp: InetAddress,
+  val requestTime: Long,
+  val waitTime: Long,
+  val cumTime: Long
+) {
+
+  fun encode(): Bytes {
+    return RLP.encodeList { writer ->
+      writer.writeValue(topic)
+      writer.writeValue(srcNodeId)
+      writer.writeValue(Bytes.wrap(srcIp.address))
+      writer.writeLong(requestTime)
+      writer.writeLong(waitTime)
+      writer.writeLong(cumTime)
+    }
+  }
+
+  fun encrypt(key: Bytes): Bytes {
+    val ticketBytes = encode()
+    return AES128GCM.encrypt(key, Bytes.wrap(ByteArray(ZERO_NONCE_SIZE)), ticketBytes, Bytes.EMPTY)
+  }
+
+  fun validate(
+    srcNodeId: Bytes,
+    srcIp: InetAddress,
+    now: Long,
+    topic: Bytes
+  ) {
+    require(this.srcNodeId == srcNodeId) { TICKET_INVALID_MSG }
+    require(this.srcIp == srcIp) { TICKET_INVALID_MSG }
+    require(this.topic == topic) { TICKET_INVALID_MSG }
+    val windowStart = this.requestTime + this.waitTime
+    require(now >= windowStart && now <= windowStart + TIME_WINDOW_MS) { TICKET_INVALID_MSG }
+  }
+
+  companion object {
+    private const val ZERO_NONCE_SIZE: Int = 12
+    internal const val TIME_WINDOW_MS: Int = 10000 // 10 seconds
+    internal const val TICKET_INVALID_MSG = "Ticket is invalid"
+
+    fun create(content: Bytes): Ticket {
+      return RLP.decodeList(content) { reader ->
+        val topic = reader.readValue()
+        val srcNodeId = reader.readValue()
+        val srcIp = InetAddress.getByAddress(reader.readValue().toArray())
+        val requestTime = reader.readLong()
+        val waitTime = reader.readLong()
+        val cumTime = reader.readLong()
+        return@decodeList Ticket(topic, srcNodeId, srcIp, requestTime, waitTime, cumTime)
+      }
+    }
+
+    fun decrypt(encrypted: Bytes, key: Bytes): Ticket {
+      val decrypted = AES128GCM.decrypt(encrypted, key, Bytes.EMPTY)
+      return create(decrypted)
+    }
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt
new file mode 100644
index 0000000..7f41f08
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TicketHolder.kt
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+
+class TicketHolder {
+
+  private val tickets: MutableMap<Bytes, Bytes> = hashMapOf() // requestId to ticket
+
+
+  fun putTicket(requestId: Bytes, ticket: Bytes) {
+    tickets[requestId] = ticket
+  }
+
+  fun getTicket(requestId: Bytes): Bytes =
+    tickets[requestId] ?: throw IllegalArgumentException("Ticket not found.")
+
+  fun removeTicket(requestId: Bytes): Bytes? = tickets.remove(requestId)
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt
new file mode 100644
index 0000000..5403b8d
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/Topic.kt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+data class Topic(
+  val content: String
+)
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt
new file mode 100644
index 0000000..e5bb843
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicRegistrar.kt
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
+import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
+import org.apache.tuweni.devp2p.v5.packet.UdpMessage
+import java.net.InetSocketAddress
+import kotlin.coroutines.CoroutineContext
+
+class TopicRegistrar(
+  override val coroutineContext: CoroutineContext = Dispatchers.IO,
+  private val connector: DefaultUdpConnector
+) : CoroutineScope {
+
+  fun delayRegTopic(requestId: Bytes, topic: Bytes, waitTime: Long) {
+    launch {
+      delay(waitTime)
+
+      val ticket = connector.getTicketHolder().getTicket(requestId)
+      sendRegTopic(topic, ticket, requestId)
+    }
+  }
+
+  fun registerTopic(topic: Bytes, withDelay: Boolean = false) {
+    launch {
+      if (withDelay) {
+        delay(SEND_REGTOPIC_DELAY_MS)
+      }
+
+      sendRegTopic(topic)
+    }
+  }
+
+  private fun sendRegTopic(topic: Bytes, ticket: Bytes = Bytes.EMPTY, requestId: Bytes = UdpMessage.requestId()) {
+    val nodeEnr = connector.getEnrBytes()
+    val message = RegTopicMessage(requestId, nodeEnr, topic, ticket)
+
+    val receivers = connector.getNodesTable().nodesOfDistance(1) //todo radius
+    receivers.forEach { rlp ->
+      val receiver = EthereumNodeRecord.fromRLP(rlp)
+      val address = InetSocketAddress(receiver.ip(), receiver.udp())
+      val nodeId = Hash.sha2_256(rlp)
+      connector.send(address, message, nodeId)
+    }
+  }
+
+  companion object {
+    private const val SEND_REGTOPIC_DELAY_MS = 15 * 60 * 1000L // 15 min
+  }
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt
new file mode 100644
index 0000000..4ce963a
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTable.kt
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import com.google.common.cache.Cache
+import com.google.common.cache.CacheBuilder
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.Hash
+import org.apache.tuweni.devp2p.DiscoveryService
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import java.util.concurrent.TimeUnit
+
+class TopicTable(
+  private val tableCapacity: Int = MAX_TABLE_CAPACITY,
+  private val queueCapacity: Int = MAX_ENTRIES_PER_TOPIC
+) {
+
+  private val timeSupplier: () -> Long = DiscoveryService.CURRENT_TIME_SUPPLIER
+  private val table: HashMap<Topic, Cache<String, TargetAd>> = HashMap(tableCapacity)
+
+
+  init {
+    require(tableCapacity > 0) { "Table capacity value must be positive" }
+    require(queueCapacity > 0) { "Queue capacity value must be positive" }
+  }
+
+  fun getNodes(topic: Topic): List<Bytes> {
+    val values = table[topic]
+    return values?.let { values.asMap().values.map { it.enr } } ?: emptyList()
+  }
+
+  /**
+   * Puts a topic in a table
+   *
+   * @return wait time for registrant node (0 is topic was putted immediately, -1 in case of error)
+   */
+  @Synchronized
+  fun put(topic: Topic, enr: Bytes): Long {
+    gcTable()
+
+    val topicQueue = table[topic]
+    val nodeId = Hash.sha2_256(enr).toHexString()
+
+    if (null != topicQueue) {
+      if (topicQueue.size() < queueCapacity) {
+        topicQueue.put(nodeId, TargetAd(timeSupplier(), enr))
+        return 0 // put immediately
+      } else {
+        // Queue if full (wait time = target-ad-lifetime - oldest ad lifetime in queue)
+        return TARGET_AD_LIFETIME_MS - (timeSupplier() - topicQueue.oldest().regTime)
+      }
+    }
+
+    if (table.size < tableCapacity) {
+      table[topic] = createNewQueue().apply { put(nodeId, TargetAd(timeSupplier(), enr)) }
+      return 0 // put immediately
+    } else {
+      //table is full (wait time = target-ad-lifetime - oldest in table of youngest in queue)
+      val oldestInTable = table.entries.map { it.value.youngest().regTime }.min() ?: -1
+      return TARGET_AD_LIFETIME_MS - (timeSupplier() - oldestInTable)
+    }
+
+  }
+
+  fun contains(topic: Topic): Boolean = table.containsKey(topic)
+
+  fun isEmpty(): Boolean = table.isEmpty()
+
+  fun clear() = table.clear()
+
+  private fun createNewQueue(): Cache<String, TargetAd> {
+    return CacheBuilder.newBuilder()
+      .expireAfterWrite(TARGET_AD_LIFETIME_MS, TimeUnit.MILLISECONDS)
+      .initialCapacity(queueCapacity)
+      .build()
+  }
+
+  private fun gcTable() {
+    table.entries.removeIf { it.value.size() == 0L }
+  }
+
+  private fun Cache<String, TargetAd>.oldest(): TargetAd {
+    return asMap().values.minBy { it.regTime } ?: throw IllegalArgumentException(QUEUE_EMPTY_MSG)
+  }
+
+  private fun Cache<String, TargetAd>.youngest(): TargetAd {
+    return asMap().values.maxBy { it.regTime } ?: throw IllegalArgumentException(QUEUE_EMPTY_MSG)
+  }
+
+  companion object {
+    internal const val MAX_ENTRIES_PER_TOPIC: Int = 100
+    private const val MAX_TABLE_CAPACITY: Int = 500
+    private const val TARGET_AD_LIFETIME_MS: Long = 15 * 60 * 1000 // 15 min
+
+    private const val QUEUE_EMPTY_MSG = "Queue is empty."
+  }
+}
+
+class TargetAd(
+  val regTime: Long,
+  val enr: Bytes
+)
+
+
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
index 65f61a6..f879ef8 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegConfirmationMessageTest.kt
@@ -24,20 +24,20 @@
   @Test
   fun encodeCreatesValidBytesSequence() {
     val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
-    val message = RegConfirmationMessage(requestId, false)
+    val message = RegConfirmationMessage(requestId, Bytes.random(32))
 
     val encodingResult = message.encode()
 
     val decodingResult = RegConfirmationMessage.create(encodingResult)
 
     assert(decodingResult.requestId == requestId)
-    assert(decodingResult.registered == message.registered)
+    assert(decodingResult.topic == message.topic)
   }
 
   @Test
   fun getMessageTypeHasValidIndex() {
-    val message = RegConfirmationMessage()
+    val message = RegConfirmationMessage(topic = Bytes.random(32))
 
-    assert(8 == message.getMessageType().toInt())
+    assert(7 == message.getMessageType().toInt())
   }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
index c33b43b..c15f5d6 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/RegTopicMessageTest.kt
@@ -24,7 +24,7 @@
   @Test
   fun encodeCreatesValidBytesSequence() {
     val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
-    val message = RegTopicMessage(requestId, Bytes.random(32), Bytes.random(32))
+    val message = RegTopicMessage(requestId, Bytes.random(32), Bytes.random(32), Bytes.random(16))
 
     val encodingResult = message.encode()
 
@@ -37,8 +37,8 @@
 
   @Test
   fun getMessageTypeHasValidIndex() {
-    val message = RegTopicMessage(ticket = Bytes.random(32), nodeRecord = Bytes.random(32))
+    val message = RegTopicMessage(ticket = Bytes.random(32), nodeRecord = Bytes.random(32), topic = Bytes.random(16))
 
-    assert(7 == message.getMessageType().toInt())
+    assert(5 == message.getMessageType().toInt())
   }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt
deleted file mode 100644
index a43a328..0000000
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/ReqTicketMessageTest.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.devp2p.v5.packet
-
-import org.apache.tuweni.bytes.Bytes
-import org.junit.jupiter.api.Test
-
-class ReqTicketMessageTest {
-
-  @Test
-  fun encodeCreatesValidBytesSequence() {
-    val requestId = Bytes.fromHexString("0xC6E32C5E89CAA754")
-    val message = ReqTicketMessage(requestId, Bytes.random(32))
-
-    val encodingResult = message.encode()
-
-    val decodingResult = ReqTicketMessage.create(encodingResult)
-
-    assert(decodingResult.requestId == requestId)
-    assert(decodingResult.topic == message.topic)
-  }
-
-  @Test
-  fun getMessageTypeHasValidIndex() {
-    val message = ReqTicketMessage(topic = Bytes.random(32))
-
-    assert(5 == message.getMessageType().toInt())
-  }
-}
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
index 36d66a8..9c1c541 100644
--- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/packet/TopicQueryMessageTest.kt
@@ -38,6 +38,6 @@
   fun getMessageTypeHasValidIndex() {
     val message = TopicQueryMessage(topic = Bytes.random(32))
 
-    assert(9 == message.getMessageType().toInt())
+    assert(8 == message.getMessageType().toInt())
   }
 }
diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt
new file mode 100644
index 0000000..181a08f
--- /dev/null
+++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicTableTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.v5.topic
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.devp2p.EthereumNodeRecord
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetAddress
+
+@ExtendWith(BouncyCastleExtension::class)
+class TopicTableTest {
+  private val keyPair: SECP256K1.KeyPair = SECP256K1.KeyPair.random()
+  private val enr: Bytes = EthereumNodeRecord.toRLP(keyPair, ip = InetAddress.getLocalHost())
+
+  private val topicTable = TopicTable(TABLE_CAPACITY, QUEUE_CAPACITY)
+
+  @Test
+  fun putAddNodeToEmptyQueueImmediately() {
+    val waitTime = topicTable.put(Topic("A"), enr)
+
+    assert(!topicTable.isEmpty())
+    assert(waitTime == 0L)
+  }
+
+  @Test
+  fun putAddNodeToNotEmptyQueueShouldReturnWaitingTime() {
+    val topic = Topic("A")
+    topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+    topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+
+    val waitTime = topicTable.put(topic, enr)
+
+    assert(waitTime > 0L)
+  }
+
+  @Test
+  fun putAddNodeToNotEmptyTableShouldReturnWaitingTime() {
+    topicTable.put(Topic("A"), enr)
+    topicTable.put(Topic("B"), enr)
+
+    val waitTime = topicTable.put(Topic("C"), enr)
+
+    assert(waitTime > 0L)
+  }
+
+  @Test
+  fun getNodesReturnNodesThatProvidesTopic() {
+    val topic = Topic("A")
+    topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+    topicTable.put(topic, EthereumNodeRecord.toRLP(SECP256K1.KeyPair.random(), ip = InetAddress.getLocalHost()))
+
+    val nodes = topicTable.getNodes(topic)
+
+    assert(nodes.isNotEmpty())
+    assert(nodes.size == 2)
+  }
+
+  @Test
+  fun contains() {
+    val topic = Topic("A")
+    topicTable.put(topic, enr)
+
+    val containsTrue = topicTable.contains(topic)
+    assert(containsTrue)
+
+    val containsFalse = topicTable.contains(Topic("B"))
+    assert(!containsFalse)
+  }
+
+  @AfterEach
+  fun tearDown() {
+    topicTable.clear()
+  }
+
+  companion object {
+    private const val TABLE_CAPACITY = 2
+    private const val QUEUE_CAPACITY = 2
+  }
+
+}