blob: ebb4eb8363ba7e4a924d105f4e1303f5257b0861 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.devp2p.v5.internal
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.crypto.Hash
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.v5.MessageHandler
import org.apache.tuweni.devp2p.v5.PacketCodec
import org.apache.tuweni.devp2p.v5.UdpConnector
import org.apache.tuweni.devp2p.v5.internal.handler.RandomMessageHandler
import org.apache.tuweni.devp2p.v5.internal.handler.WhoAreYouMessageHandler
import org.apache.tuweni.devp2p.v5.packet.FindNodeMessage
import org.apache.tuweni.devp2p.v5.packet.RandomMessage
import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.apache.tuweni.devp2p.v5.packet.WhoAreYouMessage
import org.apache.tuweni.devp2p.v5.misc.HandshakeInitParameters
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.logging.Logger
import kotlin.coroutines.CoroutineContext
class DefaultUdpConnector(
private val bindAddress: InetSocketAddress,
private val keyPair: SECP256K1.KeyPair,
private val selfEnr: Bytes,
private val nodeId: Bytes = Hash.sha2_256(selfEnr),
private val receiveChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
private val sendChannel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(),
private val packetCodec: PacketCodec = DefaultPacketCodec(keyPair, selfEnr),
override val coroutineContext: CoroutineContext = Dispatchers.IO
) : UdpConnector, CoroutineScope {
private val log: Logger = Logger.getLogger(this.javaClass.simpleName)
private val randomMessageHandler: MessageHandler<RandomMessage> = RandomMessageHandler()
private val whoAreYouMessageHandler: MessageHandler<WhoAreYouMessage> = WhoAreYouMessageHandler(nodeId)
private val authenticatingPeers: MutableMap<InetSocketAddress, Bytes> = mutableMapOf()
private lateinit var receiveJob: Job
override fun start() {
receiveChannel.bind(bindAddress)
receiveJob = launch {
val datagram = ByteBuffer.allocate(UdpMessage.MAX_UDP_MESSAGE_SIZE)
while (receiveChannel.isOpen) {
datagram.clear()
val address = receiveChannel.receive(datagram) as InetSocketAddress
datagram.flip()
try {
processDatagram(datagram, address)
} catch (ex: Exception) {
log.warning(ex.message)
}
}
}
}
override fun send(
address: InetSocketAddress,
message: UdpMessage,
destNodeId: Bytes,
handshakeParams: HandshakeInitParameters?
) {
launch {
val buffer = packetCodec.encode(message, destNodeId, handshakeParams)
sendChannel.send(ByteBuffer.wrap(buffer.toArray()), address)
}
}
override fun terminate() {
receiveJob.cancel()
receiveChannel.close()
sendChannel.close()
}
override fun available(): Boolean = receiveChannel.isOpen
override fun started(): Boolean = ::receiveJob.isInitialized && available()
override fun getEnr(): Bytes = selfEnr
override fun addPendingNodeId(address: InetSocketAddress, nodeId: Bytes) {
authenticatingPeers[address] = nodeId
}
override fun getNodeKeyPair(): SECP256K1.KeyPair = keyPair
override fun getPendingNodeIdByAddress(address: InetSocketAddress): Bytes {
val result = authenticatingPeers[address]
?: throw IllegalArgumentException("Authenticated peer not found with address ${address.hostName}:${address.port}")
authenticatingPeers.remove(address)
return result
}
private fun processDatagram(datagram: ByteBuffer, address: InetSocketAddress) {
val messageBytes = Bytes.wrapByteBuffer(datagram)
val decodeResult = packetCodec.decode(messageBytes)
val message = decodeResult.message
when (message) {
is RandomMessage -> randomMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is WhoAreYouMessage -> whoAreYouMessageHandler.handle(message, address, decodeResult.srcNodeId, this)
is FindNodeMessage -> { } //TODO: response with NODES message
else -> throw IllegalArgumentException("Unexpected message has been received - ${message::class.java.simpleName}")
}
}
}