| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tuweni.devp2p |
| |
| import com.google.common.cache.Cache |
| import com.google.common.cache.CacheBuilder |
| import kotlinx.coroutines.CancellationException |
| import kotlinx.coroutines.CompletableDeferred |
| import kotlinx.coroutines.CoroutineExceptionHandler |
| import kotlinx.coroutines.CoroutineScope |
| import kotlinx.coroutines.Deferred |
| import kotlinx.coroutines.Dispatchers |
| import kotlinx.coroutines.ExperimentalCoroutinesApi |
| import kotlinx.coroutines.Job |
| import kotlinx.coroutines.ObsoleteCoroutinesApi |
| import kotlinx.coroutines.TimeoutCancellationException |
| import kotlinx.coroutines.async |
| import kotlinx.coroutines.awaitAll |
| import kotlinx.coroutines.channels.Channel |
| import kotlinx.coroutines.channels.ClosedReceiveChannelException |
| import kotlinx.coroutines.channels.ClosedSendChannelException |
| import kotlinx.coroutines.channels.SendChannel |
| import kotlinx.coroutines.channels.consumeEach |
| import kotlinx.coroutines.delay |
| import kotlinx.coroutines.launch |
| import kotlinx.coroutines.withTimeout |
| import kotlinx.coroutines.withTimeoutOrNull |
| import kotlinx.coroutines.yield |
| import org.apache.tuweni.bytes.Bytes |
| import org.apache.tuweni.bytes.Bytes32 |
| import org.apache.tuweni.concurrent.AsyncCompletion |
| import org.apache.tuweni.concurrent.AsyncResult |
| import org.apache.tuweni.concurrent.coroutines.CoroutineLatch |
| import org.apache.tuweni.concurrent.coroutines.asyncCompletion |
| import org.apache.tuweni.concurrent.coroutines.asyncResult |
| import org.apache.tuweni.crypto.SECP256K1 |
| import org.apache.tuweni.kademlia.orderedInsert |
| import org.apache.tuweni.kademlia.xorDistCmp |
| import org.apache.tuweni.net.coroutines.CommonCoroutineGroup |
| import org.apache.tuweni.net.coroutines.CoroutineChannelGroup |
| import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel |
| import org.slf4j.LoggerFactory |
| import java.io.IOException |
| import java.net.InetAddress |
| import java.net.InetSocketAddress |
| import java.net.SocketAddress |
| import java.net.URI |
| import java.nio.ByteBuffer |
| import java.nio.channels.ClosedChannelException |
| import java.time.Instant |
| import java.util.concurrent.ConcurrentHashMap |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.atomic.AtomicLong |
| import kotlin.coroutines.CoroutineContext |
| |
| internal const val PACKET_EXPIRATION_PERIOD_MS: Long = (20 * 1000) // 20 seconds |
| internal const val PACKET_EXPIRATION_CHECK_GRACE_MS: Long = (5 * 1000) // 5 seconds |
| internal const val PEER_VERIFICATION_TIMEOUT_MS: Long = (22 * 1000) // 22 seconds (packet expiration + a little) |
| internal const val ENR_REQUEST_TIMEOUT_MS: Long = (22 * 1000) // 22 seconds (packet expiration + a little) |
| internal const val PEER_VERIFICATION_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 5 minutes |
| internal const val ENR_REQUEST_RETRY_DELAY_MS: Long = (5 * 60 * 1000) // 5 minutes |
| internal const val BOOTSTRAP_PEER_VERIFICATION_TIMEOUT_MS: Long = (2 * 60 * 1000) // 2 minutes |
| internal const val REFRESH_INTERVAL_MS: Long = (60 * 1000) // 1 minute |
| internal const val PING_RETRIES: Int = 20 |
| internal const val RESEND_DELAY_MS: Long = 1000 // 1 second |
| internal const val RESEND_DELAY_INCREASE_MS: Long = 500 // 500 milliseconds |
| internal const val RESEND_MAX_DELAY_MS: Long = (30 * 1000) // 30 seconds |
| internal const val ENDPOINT_PROOF_LONGEVITY_MS: Long = (12 * 60 * 60 * 1000) // 12 hours |
| internal const val FIND_NODES_CACHE_EXPIRY: Long = (3 * 60 * 1000) // 3 minutes |
| internal const val FIND_NODES_QUERY_GAP_MS: Long = (30 * 1000) // 30 seconds |
| internal const val LOOKUP_RESPONSE_TIMEOUT_MS: Long = 500 // 500 milliseconds |
| |
| /** |
| * An Ethereum ÐΞVp2p discovery service. |
| * |
| * This service supports devp2p discovery v4, alongside support for EIP-868. |
| * http://eips.ethereum.org/EIPS/eip-868 |
| * |
| */ |
| interface DiscoveryService { |
| |
| companion object { |
| internal val CURRENT_TIME_SUPPLIER: () -> Long = { System.currentTimeMillis() } |
| internal val DEFAULT_BUFFER_ALLOCATOR = { ByteBuffer.allocate(Packet.MAX_SIZE) } |
| |
| /** |
| * Start the discovery service. |
| * |
| * @param keyPair the local node's keypair |
| * @param port the port to listen on (defaults to `0`, which will cause a random free port to be chosen) |
| * @param host the host name or IP address of the interface to bind to (defaults to `null`, which will cause the |
| * service to listen on all interfaces |
| * @param seq the sequence number of the Ethereum Node Record |
| * @param enrData the additional key/value pair entries to broadcast as an Ethereum Node Record (ENR). |
| * @param bootstrapURIs the URIs for bootstrap nodes |
| * @param peerRepository a [PeerRepository] for obtaining [Peer] instances |
| * @param advertiseAddress the IP address to advertise to peers, or `null` if the address of the first bound |
| * interface should be used. |
| * @param advertiseUdpPort the UDP port to advertise to peers, or `null` if the bound port should to be used. |
| * @param advertiseTcpPort the TCP port to advertise to peers, or `null` if it should be the same as the UDP port. |
| * @param routingTable a [PeerRoutingTable] which handles the ÐΞVp2p routing table |
| * @param packetFilter a filter for incoming packets |
| * @param channelGroup the [CoroutineChannelGroup] for network channels created by this service |
| * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger |
| * @param timeSupplier a function supplying the current time, in milliseconds since the epoch |
| */ |
| @JvmOverloads |
| fun open( |
| keyPair: SECP256K1.KeyPair, |
| port: Int = 0, |
| host: String? = null, |
| seq: Long = Instant.now().toEpochMilli(), |
| enrData: Map<String, Bytes> = emptyMap(), |
| bootstrapURIs: List<URI> = emptyList(), |
| peerRepository: PeerRepository = EphemeralPeerRepository(), |
| advertiseAddress: InetAddress? = null, |
| advertiseUdpPort: Int? = null, |
| advertiseTcpPort: Int? = null, |
| routingTable: PeerRoutingTable = DevP2PPeerRoutingTable(keyPair.publicKey()), |
| packetFilter: ((SECP256K1.PublicKey, InetSocketAddress) -> Boolean)? = null, |
| channelGroup: CoroutineChannelGroup = CommonCoroutineGroup, |
| bufferAllocator: () -> ByteBuffer = DEFAULT_BUFFER_ALLOCATOR, |
| timeSupplier: () -> Long = CURRENT_TIME_SUPPLIER |
| ): DiscoveryService { |
| val bindAddress = if (host == null) InetSocketAddress(port) else InetSocketAddress(host, port) |
| return open( |
| keyPair, |
| bindAddress, |
| seq, |
| enrData, |
| bootstrapURIs, |
| peerRepository, |
| advertiseAddress, |
| advertiseUdpPort, |
| advertiseTcpPort, |
| routingTable, |
| packetFilter, |
| channelGroup, |
| bufferAllocator, |
| timeSupplier |
| ) |
| } |
| |
| /** |
| * Start the discovery service. |
| * |
| * @param keyPair the local node's keypair |
| * @param bindAddress the address to listen on |
| * @param seq the sequence number of the Ethereum Node Record |
| * @param enrData the additional key/value pair entries to broadcast as an Ethereum Node Record (ENR). |
| * @param bootstrapURIs the URIs for bootstrap nodes |
| * @param peerRepository a [PeerRepository] for obtaining [Peer] instances |
| * @param advertiseAddress the IP address to advertise for incoming packets |
| * @param advertiseUdpPort the UDP port to advertise to peers, or `null` if the bound port should to be used. |
| * @param advertiseTcpPort the TCP port to advertise to peers, or `null` if it should be the same as the UDP port. |
| * @param routingTable a [PeerRoutingTable] which handles the ÐΞVp2p routing table |
| * @param packetFilter a filter for incoming packets |
| * @param channelGroup the [CoroutineChannelGroup] for network channels created by this service |
| * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger |
| * @param timeSupplier a function supplying the current time, in milliseconds since the epoch |
| */ |
| @JvmOverloads |
| fun open( |
| keyPair: SECP256K1.KeyPair, |
| bindAddress: InetSocketAddress, |
| seq: Long = Instant.now().toEpochMilli(), |
| enrData: Map<String, Bytes> = emptyMap(), |
| bootstrapURIs: List<URI> = emptyList(), |
| peerRepository: PeerRepository = EphemeralPeerRepository(), |
| advertiseAddress: InetAddress? = null, |
| advertiseUdpPort: Int? = null, |
| advertiseTcpPort: Int? = null, |
| routingTable: PeerRoutingTable = DevP2PPeerRoutingTable(keyPair.publicKey()), |
| packetFilter: ((SECP256K1.PublicKey, InetSocketAddress) -> Boolean)? = null, |
| channelGroup: CoroutineChannelGroup = CommonCoroutineGroup, |
| bufferAllocator: () -> ByteBuffer = DEFAULT_BUFFER_ALLOCATOR, |
| timeSupplier: () -> Long = CURRENT_TIME_SUPPLIER |
| ): DiscoveryService { |
| return CoroutineDiscoveryService( |
| keyPair, seq, enrData, bindAddress, bootstrapURIs, advertiseAddress, advertiseUdpPort, advertiseTcpPort, |
| peerRepository, routingTable, packetFilter, channelGroup, bufferAllocator, timeSupplier |
| ) |
| } |
| } |
| |
| /** |
| * `true` if the service has been shutdown |
| */ |
| val isShutdown: Boolean |
| |
| /** |
| * `true` if the service has terminated |
| */ |
| val isTerminated: Boolean |
| |
| /** |
| * the UDP port that the service is listening on |
| */ |
| val localPort: Int |
| |
| /** |
| * the node id for this node (i.e. it's public key) |
| */ |
| val nodeId: SECP256K1.PublicKey |
| |
| /** |
| * Suspend until the bootstrap peers have been reached, or failed. |
| * |
| * @return the number of bootstrap peers successfully added |
| */ |
| suspend fun awaitBootstrap(): Int |
| |
| /** |
| * Attempt to find a specific peer, or peers close to it. |
| * |
| * @param target the node-id to search for |
| * @return a list of 16 peers, ordered by their distance to the target node-id. |
| */ |
| suspend fun lookup(target: SECP256K1.PublicKey): List<Peer> |
| |
| /** |
| * Attempt to find a specific peer, or peers close to it asynchronously. |
| * |
| * @param target the node-id to search for |
| * @return a future of a list of 16 peers, ordered by their distance to the target node-id. |
| */ |
| fun lookupAsync(target: SECP256K1.PublicKey): AsyncResult<List<Peer>> |
| |
| /** |
| * Request shutdown of this service. The service will terminate at a later time (see [DiscoveryService.awaitTermination]). |
| */ |
| fun shutdown() |
| |
| /** |
| * Suspend until this service has terminated. |
| */ |
| suspend fun awaitTermination() |
| |
| /** |
| * Provide a completion that will complete when the service has terminated. |
| * |
| * @return A completion that will complete when the service has terminated. |
| */ |
| fun awaitTerminationAsync(): AsyncCompletion |
| |
| /** |
| * Shutdown this service immediately. |
| */ |
| fun shutdownNow() |
| |
| val invalidPackets: Long |
| val selfPackets: Long |
| val expiredPackets: Long |
| val filteredPackets: Long |
| val unvalidatedPeerPackets: Long |
| val unexpectedPongs: Long |
| val unexpectedNeighbors: Long |
| val unexpectedENRResponses: Long |
| } |
| |
| internal class CoroutineDiscoveryService( |
| private val keyPair: SECP256K1.KeyPair, |
| private val seq: Long = Instant.now().toEpochMilli(), |
| private val enrData: Map<String, Bytes>, |
| bindAddress: InetSocketAddress, |
| bootstrapURIs: List<URI> = emptyList(), |
| advertiseAddress: InetAddress? = null, |
| advertiseUdpPort: Int? = null, |
| advertiseTcpPort: Int? = null, |
| private val peerRepository: PeerRepository = EphemeralPeerRepository(), |
| private val routingTable: PeerRoutingTable = DevP2PPeerRoutingTable(keyPair.publicKey()), |
| private val packetFilter: ((SECP256K1.PublicKey, InetSocketAddress) -> Boolean)? = null, |
| channelGroup: CoroutineChannelGroup = CommonCoroutineGroup, |
| private val bufferAllocator: () -> ByteBuffer = DiscoveryService.DEFAULT_BUFFER_ALLOCATOR, |
| private val timeSupplier: () -> Long = DiscoveryService.CURRENT_TIME_SUPPLIER, |
| private val channel: CoroutineDatagramChannel = CoroutineDatagramChannel.open(channelGroup) |
| ) : DiscoveryService, CoroutineScope { |
| |
| companion object { |
| internal val logger = LoggerFactory.getLogger(DiscoveryService::class.java) |
| } |
| |
| private val serviceDescriptor = "ÐΞVp2p discovery " + System.identityHashCode(this) |
| private val selfEndpoint: Endpoint |
| private val enr: Bytes |
| |
| private val job = Job() |
| // override the default exception handler, which dumps to stderr |
| override val coroutineContext: CoroutineContext |
| get() = job + Dispatchers.Default + CoroutineExceptionHandler { _, _ -> } |
| |
| private val activityLatch = CoroutineLatch(1) |
| private val bootstrapperCount: Deferred<Int> |
| private val refreshLoop: Job |
| |
| override val isShutdown: Boolean |
| get() = !channel.isOpen |
| |
| override val isTerminated: Boolean |
| get() = activityLatch.isOpen |
| |
| override val localPort: Int |
| get() = channel.localPort |
| |
| override val nodeId: SECP256K1.PublicKey |
| get() = keyPair.publicKey() |
| |
| private val verifyingEndpoints: Cache<InetSocketAddress, EndpointVerification> = |
| CacheBuilder.newBuilder().expireAfterAccess(PEER_VERIFICATION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS).build() |
| private val requestingENRs: Cache<InetSocketAddress, ENRRequest> = |
| CacheBuilder.newBuilder().expireAfterAccess(ENR_REQUEST_RETRY_DELAY_MS, TimeUnit.MILLISECONDS).build() |
| private val awaitingPongs = ConcurrentHashMap<Bytes32, EndpointVerification>() |
| private val awaitingENRs = ConcurrentHashMap<Bytes32, ENRRequest>() |
| private val findNodeStates: Cache<SECP256K1.PublicKey, FindNodeState> = |
| CacheBuilder.newBuilder().expireAfterAccess(FIND_NODES_CACHE_EXPIRY, TimeUnit.MILLISECONDS) |
| .removalListener<SECP256K1.PublicKey, FindNodeState> { it.value.close() } |
| .build() |
| |
| override var invalidPackets: Long by AtomicLong(0) |
| override var selfPackets: Long by AtomicLong(0) |
| override var expiredPackets: Long by AtomicLong(0) |
| override var filteredPackets: Long by AtomicLong(0) |
| override var unvalidatedPeerPackets: Long by AtomicLong(0) |
| override var unexpectedPongs: Long by AtomicLong(0) |
| override var unexpectedENRResponses: Long by AtomicLong(0) |
| override var unexpectedNeighbors: Long by AtomicLong(0) |
| |
| init { |
| channel.bind(bindAddress) |
| |
| selfEndpoint = Endpoint( |
| advertiseAddress ?: channel.getAdvertisableAddress()!!, |
| advertiseUdpPort ?: channel.localPort, |
| advertiseTcpPort |
| ) |
| |
| enr = EthereumNodeRecord.toRLP( |
| keyPair, seq, enrData, selfEndpoint.address, selfEndpoint.tcpPort, |
| selfEndpoint.udpPort |
| ) |
| |
| val bootstrapping = bootstrapURIs.map { uri -> |
| activityLatch.countUp() |
| async { |
| try { |
| bootstrapFrom(uri) |
| } finally { |
| activityLatch.countDown() |
| } |
| } |
| } |
| bootstrapperCount = async { |
| bootstrapping.awaitAll().sumBy { success -> if (success) 1 else 0 } |
| } |
| |
| refreshLoop = launch { |
| activityLatch.countUp() |
| try { |
| while (true) { |
| delay(REFRESH_INTERVAL_MS) |
| refresh() |
| } |
| } finally { |
| activityLatch.countDown() |
| } |
| } |
| |
| logger.info("{}: started, listening on {}", serviceDescriptor, channel.localAddress) |
| launch { |
| try { |
| receivePackets() |
| } finally { |
| for (pending in awaitingPongs.values) { |
| pending.complete(null) |
| } |
| awaitingPongs.clear() |
| verifyingEndpoints.invalidateAll() |
| verifyingEndpoints.cleanUp() |
| findNodeStates.invalidateAll() |
| findNodeStates.cleanUp() |
| activityLatch.countDown() |
| } |
| logger.info("{}: terminated", serviceDescriptor) |
| } |
| } |
| |
| private suspend fun bootstrapFrom(uri: URI): Boolean { |
| val (bootstrapNodeId, endpoint) = parseEnodeUri(uri) |
| val peer = peerRepository.get(bootstrapNodeId) |
| val now = timeSupplier() |
| peer.updateEndpoint(endpoint, now) |
| try { |
| val result = withTimeout(BOOTSTRAP_PEER_VERIFICATION_TIMEOUT_MS) { |
| endpointVerification(endpoint, peer).verifyWithRetries() |
| } ?: return false |
| if (result.peer != peer) { |
| logger.warn( |
| "{}: ignoring bootstrap peer {} - responding node used a different node-id", |
| serviceDescriptor, uri |
| ) |
| return false |
| } |
| logger.info("{}: verified bootstrap peer {}", serviceDescriptor, uri) |
| addToRoutingTable(peer) |
| findNodes(peer, nodeId) |
| logger.info("{}: completed bootstrapping from {}", serviceDescriptor, uri) |
| return true |
| } catch (_: TimeoutCancellationException) { |
| logger.warn("{}: timeout verifying bootstrap node {}", serviceDescriptor, uri) |
| return false |
| } |
| } |
| |
| private suspend fun receivePackets() { |
| while (channel.isOpen) { |
| val datagram = bufferAllocator() |
| val address = try { |
| channel.receive(datagram) |
| } catch (e: ClosedChannelException) { |
| break |
| } |
| datagram.flip() |
| |
| // do quick sanity checks and discard bad packets before launching a co-routine |
| if (datagram.limit() < Packet.MIN_SIZE) { |
| logger.debug("{}: ignoring under-sized packet with source {}", serviceDescriptor, address) |
| ++invalidPackets |
| continue |
| } |
| if (datagram.limit() > Packet.MAX_SIZE) { |
| logger.debug("{}: ignoring over-sized packet with source {}", serviceDescriptor, address) |
| ++invalidPackets |
| continue |
| } |
| |
| activityLatch.countUp() |
| val arrivalTime = timeSupplier() |
| val job = launch { |
| try { |
| receivePacket(datagram, address, arrivalTime) |
| } catch (e: Throwable) { |
| logger.error(serviceDescriptor + ": unexpected error during packet handling", e) |
| } |
| } |
| job.invokeOnCompletion { activityLatch.countDown() } |
| yield() |
| } |
| } |
| |
| override suspend fun awaitBootstrap(): Int = bootstrapperCount.await() |
| |
| override fun shutdown() { |
| if (channel.isOpen) { |
| logger.info("{}: shutdown", serviceDescriptor) |
| } |
| channel.close() |
| refreshLoop.cancel() |
| } |
| |
| override suspend fun awaitTermination() { |
| activityLatch.await() |
| } |
| |
| override fun awaitTerminationAsync(): AsyncCompletion = asyncCompletion { awaitTermination() } |
| |
| override fun shutdownNow() { |
| job.cancel() |
| } |
| |
| @UseExperimental(ObsoleteCoroutinesApi::class) |
| override suspend fun lookup(target: SECP256K1.PublicKey): List<Peer> { |
| val targetId = target.bytesArray() |
| val results = neighbors(target).toMutableList() |
| |
| // maybe add ourselves to the set |
| val selfPeer = peerRepository.get(nodeId) |
| results.orderedInsert(selfPeer) { a, _ -> targetId.xorDistCmp(a.nodeId.bytesArray(), nodeId.bytesArray()) } |
| results.removeAt(results.lastIndex) |
| |
| val queried = mutableSetOf(selfPeer) |
| |
| while (true) { |
| val toQuery = results.filterNot { p -> queried.contains(p) }.take(3).toList() |
| if (toQuery.isEmpty()) { |
| return results |
| } |
| val nodes = Channel<Node>(capacity = Channel.UNLIMITED) |
| toQuery.forEach { p -> findNodes(p, target, nodes) } |
| while (true) { |
| // stop if no more responses are received after the given time |
| val node = withTimeoutOrNull(LOOKUP_RESPONSE_TIMEOUT_MS) { nodes.receiveOrNull() } ?: break |
| val peer = peerRepository.get(node.nodeId) |
| if (!results.contains(peer)) { |
| results.orderedInsert(peer) { a, _ -> targetId.xorDistCmp(a.nodeId.bytesArray(), node.nodeId.bytesArray()) } |
| results.removeAt(results.lastIndex) |
| } |
| } |
| queried.addAll(toQuery) |
| nodes.close() |
| } |
| } |
| |
| override fun lookupAsync(target: SECP256K1.PublicKey) = asyncResult { lookup(target) } |
| |
| private suspend fun refresh() { |
| logger.debug("{}: table refresh triggered", serviceDescriptor) |
| // TODO: instead of a random target, choose a target to optimally fill the peer table |
| lookup(SECP256K1.KeyPair.random().publicKey()) |
| } |
| |
| private suspend fun receivePacket(datagram: ByteBuffer, address: SocketAddress, arrivalTime: Long) { |
| if (address !is InetSocketAddress) { |
| throw IOException("Datagram received from non-inet socket address: " + address.javaClass) |
| } |
| |
| val packet: Packet |
| try { |
| packet = Packet.decodeFrom(datagram) |
| } catch (e: DecodingException) { |
| logger.debug("{}: ignoring invalid packet from {}", serviceDescriptor, address) |
| ++invalidPackets |
| return |
| } |
| |
| if (packet.nodeId == nodeId) { |
| logger.debug("{}: ignoring packet from self", serviceDescriptor) |
| ++selfPackets |
| return |
| } |
| |
| if (packet.isExpired(arrivalTime - PACKET_EXPIRATION_CHECK_GRACE_MS)) { |
| logger.debug("{}: ignoring expired packet", serviceDescriptor) |
| ++expiredPackets |
| return |
| } |
| |
| if (packetFilter?.invoke(packet.nodeId, address) == false) { |
| logger.debug("{}: packet rejected by filter", serviceDescriptor) |
| ++filteredPackets |
| return |
| } |
| |
| when (packet) { |
| is PingPacket -> handlePing(packet, address, arrivalTime) |
| is PongPacket -> handlePong(packet, address, arrivalTime) |
| is FindNodePacket -> handleFindNode(packet, address, arrivalTime) |
| is NeighborsPacket -> handleNeighbors(packet, address) |
| is ENRRequestPacket -> handleENRRequest(packet, address, arrivalTime) |
| is ENRResponsePacket -> handleENRResponse(packet, address, arrivalTime) |
| }.let {} // guarantees "when" matching is exhaustive |
| } |
| |
| private suspend fun handlePing(packet: PingPacket, from: InetSocketAddress, arrivalTime: Long) { |
| // COMPATIBILITY: The ping packet should contain the canonical endpoint for the peer, yet it is often observed to |
| // be incorrect (using private-subnet addresses, wildcard addresses, etc). So instead, respond to the source |
| // address of the packet itself. |
| val fromEndpoint = Endpoint(from, packet.from.tcpPort) |
| val peer = peerRepository.get(packet.nodeId) |
| // update the endpoint if the peer does not have one that's been proven |
| var currentEndpoint = peer.updateEndpoint(fromEndpoint, arrivalTime, arrivalTime - ENDPOINT_PROOF_LONGEVITY_MS) |
| if (currentEndpoint.tcpPort != packet.from.tcpPort) { |
| currentEndpoint = peer.updateEndpoint( |
| Endpoint(currentEndpoint.address, currentEndpoint.udpPort, packet.from.tcpPort), |
| arrivalTime |
| ) |
| } |
| |
| val pong = PongPacket.create(keyPair, timeSupplier(), currentEndpoint, packet.hash, seq) |
| sendPacket(from, pong) |
| // https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01 also suggests sending a ping |
| // packet if the peer is unknown, however sending two packets in response to a single incoming would allow a |
| // traffic amplification attack |
| } |
| |
| private suspend fun handlePong(packet: PongPacket, from: InetSocketAddress, arrivalTime: Long) { |
| val pending = awaitingPongs.remove(packet.pingHash) ?: run { |
| logger.debug("{}: received unexpected or late pong from {}", serviceDescriptor, from) |
| ++unexpectedPongs |
| return |
| } |
| |
| val sender = pending.peer |
| // COMPATIBILITY: If the node-id's don't match, the pong should probably be rejected. However, if a different |
| // peer is listening at the same address, it will respond to the ping with its node-id. Instead of rejecting, |
| // accept the pong and update the new peer record with the proven endpoint, preferring to keep its current |
| // tcpPort and otherwise keeping the tcpPort of the original peer. |
| val peer = if (sender.nodeId == packet.nodeId) sender else peerRepository.get(packet.nodeId) |
| val endpoint = if (peer.verifyEndpoint(pending.endpoint, arrivalTime)) { |
| pending.endpoint |
| } else { |
| val endpoint = peer.endpoint?.tcpPort?.let { port -> |
| Endpoint(pending.endpoint.address, pending.endpoint.udpPort, port) |
| } ?: pending.endpoint |
| peer.updateEndpoint(endpoint, arrivalTime) |
| peer.verifyEndpoint(endpoint, arrivalTime) |
| endpoint |
| } |
| |
| if (sender.nodeId == packet.nodeId) { |
| logger.debug("{}: verified peer endpoint {} (node-id: {})", serviceDescriptor, endpoint.address, peer.nodeId) |
| } else { |
| logger.debug( |
| "{}: verified peer endpoint {} (node-id: {} - changed from {})", serviceDescriptor, |
| endpoint.address, peer.nodeId, sender.nodeId |
| ) |
| } |
| |
| pending.complete(VerificationResult(peer, endpoint)) |
| |
| if (packet.enrSeq != null) { |
| if (peer.enr == null || peer.enr!!.seq < packet.enrSeq) { |
| val now = timeSupplier() |
| withTimeoutOrNull(ENR_REQUEST_TIMEOUT_MS) { enrRequest(endpoint, peer).verify(now) } |
| } |
| } |
| } |
| |
| private suspend fun handleFindNode(packet: FindNodePacket, from: InetSocketAddress, arrivalTime: Long) { |
| // if the peer has not been validated, delay sending neighbors until it is |
| val peer = peerRepository.get(packet.nodeId) |
| val (_, endpoint) = ensurePeerIsValid(peer, from, arrivalTime) ?: run { |
| logger.debug("{}: received findNode from {} which cannot be validated", serviceDescriptor, from) |
| ++unvalidatedPeerPackets |
| return |
| } |
| |
| logger.debug("{}: received findNode from {} for target-id {}", serviceDescriptor, from, packet.target) |
| val nodes = neighbors(packet.target).map { p -> p.toNode() } |
| |
| val address = endpoint.udpSocketAddress |
| NeighborsPacket.createRequired(keyPair, timeSupplier(), nodes).forEach { p -> |
| logger.debug("{}: sending {} neighbors to {}", serviceDescriptor, p.nodes.size, address) |
| sendPacket(address, p) |
| } |
| } |
| |
| private fun handleNeighbors(packet: NeighborsPacket, from: InetSocketAddress) { |
| findNodeStates.getIfPresent(packet.nodeId)?.let { state -> |
| for (node in packet.nodes) { |
| launch { |
| logger.debug("{}: received neighbour {} from {}", serviceDescriptor, node.endpoint.address, from) |
| val neighbor = peerRepository.get(node.nodeId) |
| val now = timeSupplier() |
| neighbor.updateEndpoint(node.endpoint, now, now - ENDPOINT_PROOF_LONGEVITY_MS) |
| |
| withTimeoutOrNull(PEER_VERIFICATION_TIMEOUT_MS) { |
| endpointVerification(node.endpoint, neighbor).verify(now) |
| }?.let { result -> |
| logger.debug( |
| "{}: adding {} to the routing table (node-id: {})", serviceDescriptor, |
| result.endpoint.address, result.peer.nodeId |
| ) |
| addToRoutingTable(result.peer) |
| } |
| } |
| } |
| state.receive(packet.nodes) |
| } ?: run { |
| logger.debug("{}: received unexpected or late neighbors packet from {}", serviceDescriptor, from) |
| ++unexpectedNeighbors |
| } |
| } |
| |
| private suspend fun handleENRRequest(packet: ENRRequestPacket, from: InetSocketAddress, arrivalTime: Long) { |
| val peer = peerRepository.get(packet.nodeId) |
| val (_, endpoint) = ensurePeerIsValid(peer, from, arrivalTime) ?: run { |
| logger.debug("{}: received enrRequest from {} which cannot be validated", serviceDescriptor, from) |
| ++unvalidatedPeerPackets |
| return |
| } |
| |
| logger.debug("{}: received enrRequest from {}", serviceDescriptor, from) |
| |
| val address = endpoint.udpSocketAddress |
| sendPacket(address, ENRResponsePacket.create(keyPair, timeSupplier(), packet.hash, enr)) |
| } |
| |
| private suspend fun handleENRResponse(packet: ENRResponsePacket, from: InetSocketAddress, arrivalTime: Long) { |
| val pending = awaitingENRs.remove(packet.requestHash) ?: run { |
| logger.debug("{}: received unexpected or late enr response from {}", serviceDescriptor, from) |
| ++unexpectedENRResponses |
| return |
| } |
| packet.requestHash |
| val sender = pending.peer |
| // COMPATIBILITY: If the node-id's don't match, the pong should probably be rejected. However, if a different |
| // peer is listening at the same address, it will respond to the ping with its node-id. Instead of rejecting, |
| // accept the pong and update the new peer record with the proven endpoint, preferring to keep its current |
| // tcpPort and otherwise keeping the tcpPort of the original peer. |
| val peer = if (sender.nodeId == packet.nodeId) sender else peerRepository.get(packet.nodeId) |
| |
| val enr = EthereumNodeRecord.fromRLP(packet.enr) |
| try { |
| enr.validate() |
| } catch (e: InvalidNodeRecordException) { |
| logger.debug("Invalid ENR", e) |
| return |
| } |
| |
| peer.updateENR(enr, arrivalTime) |
| |
| pending.complete(ENRResult(peer, enr)) |
| } |
| |
| private fun addToRoutingTable(peer: Peer) { |
| routingTable.add(peer)?.let { contested -> |
| launch { |
| contested.endpoint?.let { endpoint -> |
| withTimeoutOrNull(PEER_VERIFICATION_TIMEOUT_MS) { endpointVerification(endpoint, contested).verify() } |
| } ?: routingTable.evict(contested) |
| } |
| } |
| } |
| |
| private suspend fun ensurePeerIsValid( |
| peer: Peer, |
| address: InetSocketAddress, |
| arrivalTime: Long |
| ): VerificationResult? { |
| val now = timeSupplier() |
| peer.getEndpoint(now - ENDPOINT_PROOF_LONGEVITY_MS)?.let { endpoint -> |
| // already valid |
| return VerificationResult(peer, endpoint) |
| } |
| |
| val endpoint = peer.updateEndpoint( |
| Endpoint(address, peer.endpoint?.tcpPort), |
| arrivalTime, now - ENDPOINT_PROOF_LONGEVITY_MS |
| ) |
| return withTimeoutOrNull(PEER_VERIFICATION_TIMEOUT_MS) { endpointVerification(endpoint, peer).verify(now) } |
| } |
| |
| private fun endpointVerification(endpoint: Endpoint, peer: Peer) = |
| verifyingEndpoints.get(endpoint.udpSocketAddress) { EndpointVerification(endpoint, peer) } |
| |
| // a representation of the state and current action for verifying an endpoint, |
| // to avoid concurrent attempts to verify the same endpoint |
| private inner class EndpointVerification(val endpoint: Endpoint, val peer: Peer) { |
| private val deferred = CompletableDeferred<VerificationResult?>() |
| @Volatile |
| private var active: Job? = null |
| private var nextPingMs: Long = 0 |
| private var retryDelay: Long = 0 |
| |
| suspend fun verify(now: Long = timeSupplier()): VerificationResult? { |
| if (!deferred.isCompleted) { |
| // if not already actively pinging and enough time has passed since the last ping, send a single ping |
| synchronized(this) { |
| if (active?.isCompleted != false && now >= nextPingMs) { |
| nextPingMs = now + RESEND_DELAY_MS |
| launch { sendPing(now) } |
| } |
| } |
| } |
| return deferred.await() |
| } |
| |
| suspend fun verifyWithRetries(): VerificationResult? { |
| if (!deferred.isCompleted) { |
| // if not already actively pinging, start pinging with retries |
| synchronized(this) { |
| if (active?.isCompleted != false) { |
| active = launch { |
| repeat(PING_RETRIES) { |
| delay(nextPingMs - timeSupplier()) |
| nextPingMs = timeSupplier() + RESEND_DELAY_MS |
| retryDelay += RESEND_DELAY_INCREASE_MS |
| if (retryDelay > RESEND_MAX_DELAY_MS) { |
| retryDelay = RESEND_MAX_DELAY_MS |
| } |
| sendPing() |
| } |
| } |
| } |
| } |
| } |
| return deferred.await() |
| } |
| |
| private suspend fun sendPing(now: Long = timeSupplier()) { |
| val pingPacket = PingPacket.create(keyPair, now, selfEndpoint, endpoint, seq) |
| |
| // create local references to be captured in the closure, rather than the whole packet instance |
| val hash = pingPacket.hash |
| val timeout = pingPacket.expiration - now |
| |
| // very unlikely that there is another ping packet created with the same hash yet a different EndpointVerification |
| // instance, but if there is then the first will be waiting on a deferred that never completes and will |
| // eventually time out |
| if (awaitingPongs.put(hash, this) != this) { |
| launch { |
| delay(timeout) |
| awaitingPongs.remove(hash) |
| } |
| sendPacket(endpoint.udpSocketAddress, pingPacket) |
| } |
| } |
| |
| fun complete(result: VerificationResult?): Boolean { |
| active?.cancel() |
| return deferred.complete(result) |
| } |
| } |
| |
| private data class VerificationResult( |
| /** The peer that responded to the verification request. */ |
| val peer: Peer, |
| /** |
| * The endpoint that was verified. |
| * |
| * This will typically be the same as peer.endpoint, but may not be due to concurrent updates. |
| */ |
| val endpoint: Endpoint |
| ) |
| |
| private fun enrRequest(endpoint: Endpoint, peer: Peer) = |
| requestingENRs.get(endpoint.udpSocketAddress) { ENRRequest(endpoint, peer) } |
| |
| // a representation of the state and current action for querying an ENR from a peer, |
| // to avoid concurrent attempts to request the same information. |
| private inner class ENRRequest(val endpoint: Endpoint, val peer: Peer) { |
| private val deferred = CompletableDeferred<ENRResult?>() |
| @Volatile |
| private var active: Job? = null |
| private var nextENRRequest: Long = 0 |
| private var retryDelay: Long = 0 |
| |
| suspend fun verify(now: Long = timeSupplier()): ENRResult? { |
| if (!deferred.isCompleted) { |
| // if not already actively requesting and enough time has passed since the last request, send a single request |
| synchronized(this) { |
| if (active?.isCompleted != false && now >= nextENRRequest) { |
| nextENRRequest = now + RESEND_DELAY_MS |
| launch { sendENRRequest(now) } |
| } |
| } |
| } |
| return deferred.await() |
| } |
| |
| private suspend fun sendENRRequest(now: Long = timeSupplier()) { |
| val enrRequestPacket = ENRRequestPacket.create(keyPair, now) |
| |
| // create local references to be captured in the closure, rather than the whole packet instance |
| val hash = enrRequestPacket.hash |
| val timeout = enrRequestPacket.expiration - now |
| |
| // very unlikely that there is another ping packet created with the same hash yet a different ENRRequest |
| // instance, but if there is then the first will be waiting on a deferred that never completes and will |
| // eventually time out |
| if (awaitingENRs.put(hash, this) != this) { |
| launch { |
| delay(timeout) |
| awaitingENRs.remove(hash) |
| } |
| sendPacket(endpoint.udpSocketAddress, enrRequestPacket) |
| } |
| } |
| |
| fun complete(result: ENRResult?): Boolean { |
| active?.cancel() |
| return deferred.complete(result) |
| } |
| } |
| |
| private data class ENRResult( |
| val peer: Peer, |
| val enr: EthereumNodeRecord |
| ) |
| |
| @UseExperimental(ObsoleteCoroutinesApi::class) |
| private suspend fun findNodes(peer: Peer, target: SECP256K1.PublicKey) { |
| // consume all received nodes (and discard), thus suspending until completed |
| Channel<Node>(capacity = Channel.CONFLATED).also { findNodes(peer, target, it) }.consumeEach { } |
| } |
| |
| private suspend fun findNodes(peer: Peer, target: SECP256K1.PublicKey, channel: SendChannel<Node>) { |
| if (peer.nodeId == nodeId) { |
| // for queries to self, respond directly |
| neighbors(target).map { p -> channel.send(p.toNode()) } |
| return |
| } |
| findNodeStates.get(peer.nodeId) { FindNodeState(peer) }.findNodes(target, channel) |
| } |
| |
| private fun neighbors(target: SECP256K1.PublicKey) = routingTable.nearest(target, DEVP2P_BUCKET_SIZE) |
| |
| private data class FindNodeRequest(val target: SECP256K1.PublicKey, val results: SendChannel<Node>) |
| |
| private inner class FindNodeState(val peer: Peer) { |
| // the protocol doesn't have a correlation mechanism between findNode requests and the associated responses, |
| // so requests have to be queued and a delay between them used to try and determine when no more responses |
| // will be sent |
| private val targets = Channel<FindNodeRequest>(capacity = Channel.UNLIMITED) |
| private val job: Job |
| @Volatile |
| private var results: SendChannel<Node>? = null |
| @Volatile |
| private var lastReceive: Long = 0 |
| |
| init { |
| job = launch { sendLoop() } |
| } |
| |
| @UseExperimental(ExperimentalCoroutinesApi::class) |
| private suspend fun sendLoop() { |
| try { |
| while (true) { |
| val request = targets.receive() |
| if (request.results.isClosedForSend) { |
| continue |
| } |
| val endpoint = peer.endpoint |
| if (endpoint == null) { |
| request.results.close() |
| continue |
| } |
| var now = timeSupplier() |
| results = request.results |
| lastReceive = now |
| val findNodePacket = FindNodePacket.create(keyPair, now, request.target) |
| sendPacket(endpoint.udpSocketAddress, findNodePacket) |
| logger.debug("{}: sent findNode to {} for {}", serviceDescriptor, endpoint.udpSocketAddress, request.target) |
| |
| // wait for results, only moving onto the next when packets stop arriving for a reasonable period |
| do { |
| delay(lastReceive - now + FIND_NODES_QUERY_GAP_MS) |
| now = timeSupplier() |
| } while (now - lastReceive < FIND_NODES_QUERY_GAP_MS) |
| results?.close() |
| results = null |
| |
| // issue a "get" on the state cache, to indicate that this state is still in use |
| val state = findNodeStates.getIfPresent(peer.nodeId) |
| if (state != this) { |
| logger.warn("{}: findNode state for {} has been replaced") |
| close() |
| } |
| } |
| } catch (_: ClosedReceiveChannelException) { |
| // ignore |
| } catch (_: CancellationException) { |
| // ignore |
| } catch (_: ClosedChannelException) { |
| // ignore |
| } catch (e: Exception) { |
| logger.error( |
| "$serviceDescriptor: Error while sending FindNode requests for peer ${peer.nodeId}", |
| e |
| ) |
| } |
| } |
| |
| suspend fun findNodes(target: SECP256K1.PublicKey, channel: SendChannel<Node>) { |
| targets.send(FindNodeRequest(target, channel)) |
| } |
| |
| fun receive(nodes: List<Node>) { |
| results?.let { channel -> |
| lastReceive = timeSupplier() |
| try { |
| nodes.forEach { node -> channel.offer(node) } |
| } catch (_: ClosedSendChannelException) { |
| results = null |
| } |
| } |
| } |
| |
| fun close() { |
| job.cancel() |
| targets.close() |
| results?.close() |
| while (true) { |
| val request = targets.poll() ?: break |
| request.results.close() |
| } |
| } |
| } |
| |
| private suspend fun sendPacket(address: InetSocketAddress, packet: Packet) { |
| val buffer = bufferAllocator() |
| packet.encodeTo(buffer) |
| buffer.flip() |
| channel.send(buffer, address) |
| } |
| } |