blob: 51fd69c899b2206962b3ec177750cf213ee2d15c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.kademlia
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import java.util.function.Function
/**
* Determine the XOR-distance between this and an equal-length byte array.
*
* @param other the byte-array to calculate the distance to
* @return the distance as an integer
* @throws IllegalArgumentException if [other] is not the same length as this
*/
infix fun ByteArray.xorDist(other: ByteArray): Int {
require(size == other.size) { "arrays are of different lengths" }
var distance = size * 8
for (i in indices) {
val xor = (this[i].toInt() xor other[i].toInt()) and 0xff
if (xor == 0) {
distance -= 8
} else {
distance -= (Integer.numberOfLeadingZeros(xor) - 24)
break
}
}
return distance
}
/**
* Compare two equal-length byte arrays for their XOR-distance to this.
*
* @param a the first byte array
* @param b the second byte array
* @return -1 if [a] is closer, +1 if [b] is closer, or 0 if they are the same distance to this
* @throws IllegalArgumentException if [a] or [b] are not the same length as this
*/
fun ByteArray.xorDistCmp(a: ByteArray, b: ByteArray): Int {
require(size == a.size && size == b.size) { "arrays are of different lengths" }
for (i in indices) {
val distA = (this[i].toInt() xor a[i].toInt()) and 0xff
val distB = (this[i].toInt() xor b[i].toInt()) and 0xff
if (distA > distB) {
return 1
} else if (distA < distB) {
return -1
}
}
return 0
}
/**
* Insert an element into a mutable list, based on a comparison function.
*
* @param element the element to insert
* @param comparison the comparison function
*/
fun <E> MutableList<E>.orderedInsert(element: E, comparison: (E, E) -> Int) {
var i = this.binarySearch { e -> comparison(e, element) }
if (i < 0) {
i = -i - 1
}
this.add(i, element)
}
/**
* A Kademlia Routing Table
*
* @constructor Create a new routing table.
* @param selfId the ID of the local node
* @param k the size of each bucket (k value)
* @param maxReplacements the maximum number of replacements to cache in each bucket
* @param nodeId a function for obtaining the id of a network node
* @param <K> the network node type
*
*/
class KademliaRoutingTable<T>(
private val selfId: ByteArray,
k: Int,
maxReplacements: Int = k,
private val nodeId: (T) -> ByteArray,
private val distanceToSelf: (T) -> Int = { nodeId(it) xorDist selfId }
) : Set<T> {
companion object {
/**
* Create a new routing table.
*
* @param selfId the ID of the local node
* @param k the size of each bucket (k value)
* @param nodeId a function for obtaining the id of a network node
* @param <K> the network node type
* @return A new routing table
*/
@JvmStatic
fun <T> create(
selfId: ByteArray,
k: Int,
nodeId: Function<T, ByteArray>,
distanceToSelf: Function<T, Int>
): KademliaRoutingTable<T> =
KademliaRoutingTable(selfId, k, nodeId = nodeId::apply, distanceToSelf = distanceToSelf::apply)
/**
* Create a new routing table.
*
* @param selfId the ID of the local node
* @param k the size of each bucket (k value)
* @param maxReplacements the maximum number of replacements to cache in each bucket
* @param nodeId a function for obtaining the id of a network node
* @param <K> the network node type
* @return A new routing table
*/
@JvmStatic
fun <T> create(
selfId: ByteArray,
k: Int,
maxReplacements: Int,
nodeId: Function<T, ByteArray>,
distanceToSelf: Function<T, Int>
): KademliaRoutingTable<T> = KademliaRoutingTable(selfId, k, maxReplacements, nodeId::apply, distanceToSelf::apply)
}
init {
require(selfId.isNotEmpty()) { "self id must not be empty" }
require(k > 0) { "k value must be positive" }
}
private val idBitSize = selfId.size * 8
private val buckets: Array<Bucket<T>> = Array(idBitSize + 1) { Bucket<T>(k, maxReplacements) }
private val distanceCache: Cache<T, Int> =
CacheBuilder.newBuilder().maximumSize((1L + idBitSize) * k).weakKeys().build()
override val size: Int
get() = buckets.fold(0) { acc, bucket -> acc + bucket.size }
override fun isEmpty(): Boolean = buckets.find { bucket -> !bucket.isEmpty() } == null
override fun iterator(): Iterator<T> = buckets.asSequence().flatMap { bucket -> bucket.asSequence() }.iterator()
override fun contains(element: T): Boolean = bucketFor(element).contains(element)
override fun containsAll(elements: Collection<T>): Boolean {
val peers = elements.toMutableSet()
buckets.forEach { bucket ->
peers.removeAll(peers.filter { peer -> bucket.contains(peer) })
if (peers.isEmpty()) {
return true
}
}
return false
}
/**
* Return the nearest nodes to a target id, in order from closest to furthest.
*
* The sort order is the log distance from the target id to the node ids.
*
* @param targetId the id to find nodes nearest to
* @param limit the maximum number of nodes to return
* @return a list of nodes from the routing table
*/
fun nearest(targetId: ByteArray, limit: Int): List<T> {
val results = mutableListOf<T>()
for (bucket in buckets) {
for (node in bucket) {
val nodeId = idForNode(node)
results.orderedInsert(node) { a, _ -> targetId.xorDistCmp(idForNode(a), nodeId) }
if (results.size > limit) {
results.removeAt(results.lastIndex)
}
}
}
return results
}
/**
* Add a node to the table.
*
* @param node the node to add
* @return `null` if the node was successfully added to the table (or already in the table). Otherwise, a node
* will be returned that is a suitable candidate for eviction, and the provided node will be stored in
* the replacements list.
*/
fun add(node: T): T? = bucketFor(node).add(node)
/**
* Remove a node from the table, potentially adding an alternative from the replacement cache.
*
* @param node the node to evict
* @param `true` if the node was removed
*/
fun evict(node: T): Boolean = bucketFor(node).evict(node)
/**
* Clear all nodes (and replacements) from the table.
*/
fun clear() {
buckets.forEach { bucket -> bucket.clear() }
}
fun peersOfDistance(value: Int): List<T> {
return buckets[value].toList()
}
fun getRandom(): T {
return buckets.filter { !it.isEmpty() }.random().random()
}
fun logDistToSelf(node: T): Int = distanceCache.get(node) { distanceToSelf.invoke(node) }
private fun idForNode(node: T): ByteArray {
val id = nodeId(node)
require(id.size == selfId.size) { "id obtained for node is not the correct length" }
require(!id.contentEquals(selfId)) { "id obtained for node is the same as self" }
return id
}
private fun bucketFor(node: T) = buckets[logDistToSelf(node)]
private class Bucket<E> private constructor(
// ordered with most recent first
private val entries: MutableList<E>,
private val k: Int,
private val maxReplacements: Int
) : List<E> by entries {
constructor(k: Int, maxReplacements: Int) : this(mutableListOf(), k, maxReplacements)
// ordered with most recent last
private val replacementCache = mutableListOf<E>()
init {
require(k > 0) { "k value must be positive" }
}
@Synchronized
fun add(node: E): E? {
// remove from the replacement cache, if present
replacementCache.remove(node)
// check if the list contains this node, and move to the front if so
for (i in entries.indices) {
if (entries[i] == node) {
// already in table, so move to front
entries.removeAt(i)
entries.add(0, node)
return null
}
}
assert(entries.size <= k)
if (entries.size == k) {
// bucket is full, so add to the replacement cache
assert(replacementCache.size <= maxReplacements)
if (replacementCache.size == maxReplacements) {
replacementCache.removeAt(0)
}
replacementCache.add(node)
return entries.last()
}
// add entry to the front of the bucket
entries.add(0, node)
return null
}
// remove and replace from replacement cache
@Synchronized
fun evict(node: E): Boolean {
if (!entries.remove(node)) {
return false
}
if (!replacementCache.isEmpty()) {
val replacement = replacementCache.removeAt(replacementCache.lastIndex)
entries.add(0, replacement)
}
return true
}
@Synchronized
fun clear() {
entries.clear()
replacementCache.clear()
}
}
}