blob: 59bf16e1ef90ceac9a9c8bcd0b43839876157de0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.devp2p.v5.topic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.devp2p.EthereumNodeRecord
import org.apache.tuweni.devp2p.v5.internal.DefaultUdpConnector
import org.apache.tuweni.devp2p.v5.packet.RegTopicMessage
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import java.net.InetSocketAddress
import kotlin.coroutines.CoroutineContext
class TopicRegistrar(
override val coroutineContext: CoroutineContext = Dispatchers.IO,
private val connector: DefaultUdpConnector
) : CoroutineScope {
fun delayRegTopic(requestId: Bytes, topic: Bytes, waitTime: Long) {
launch {
delay(waitTime)
val ticket = connector.getTicketHolder().get(requestId)
sendRegTopic(topic, ticket, requestId)
}
}
fun registerTopic(topic: Bytes, withDelay: Boolean = false) {
launch {
if (withDelay) {
delay(SEND_REGTOPIC_DELAY_MS)
}
sendRegTopic(topic)
}
}
private fun sendRegTopic(topic: Bytes, ticket: Bytes = Bytes.EMPTY, requestId: Bytes = UdpMessage.requestId()) {
val nodeEnr = connector.getEnrBytes()
val message = RegTopicMessage(requestId, nodeEnr, topic, ticket)
val distance = 1 //TODO: use ticket radius when specification will be created
val receivers = connector.getNodesTable().nodesOfDistance(distance)
receivers.forEach { rlp ->
val receiver = EthereumNodeRecord.fromRLP(rlp)
val address = InetSocketAddress(receiver.ip(), receiver.udp())
val nodeId = Hash.sha2_256(rlp)
connector.send(address, message, nodeId)
}
}
companion object {
private const val SEND_REGTOPIC_DELAY_MS = 15 * 60 * 1000L // 15 min
}
}