blob: 593e77effd15780166e2c720be4250ec04583589 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.hobbits
import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.datagram.DatagramSocket
import io.vertx.core.http.HttpClient
import io.vertx.core.http.HttpMethod
import io.vertx.core.http.HttpServer
import io.vertx.core.net.NetClient
import io.vertx.core.net.NetServer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.coroutines.await
import java.lang.IllegalArgumentException
import java.lang.IllegalStateException
import java.lang.RuntimeException
import java.net.URI
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
/**
* Hobbits is a peer-to-peer transport stack specified at https://www.github.com/deltap2p/hobbits.
*
* This class works as a transport mechanism that can leverage a variety of network transport
* protocols, such as TCP, HTTP, UDP and Web sockets.
*
* It can be used to contact other Hobbits endpoints, or to expose endpoints to the network.
*
* @param vertx Vert.x instance
* @param incompleteMessageHandler a handler to manage incomplete or invalid messages
* @param coroutineContext the co-routine context for the transport
*/
class HobbitsTransport(
private val vertx: Vertx,
private val incompleteMessageHandler: (Bytes) -> Unit = {},
override val coroutineContext: CoroutineContext = Dispatchers.Default
) : CoroutineScope {
private val started = AtomicBoolean(false)
private val httpEndpoints = mutableMapOf<String, Endpoint>()
private val tcpEndpoints = mutableMapOf<String, Endpoint>()
private val udpEndpoints = mutableMapOf<String, Endpoint>()
private val wsEndpoints = mutableMapOf<String, Endpoint>()
private var exceptionHandler: ((Throwable) -> Unit)? = { }
private var httpClient: HttpClient? = null
private var tcpClient: NetClient? = null
private var udpClient: DatagramSocket? = null
private val httpServers = mutableMapOf<String, HttpServer>()
private val tcpServers = mutableMapOf<String, NetServer>()
private val udpServers = mutableMapOf<String, DatagramSocket>()
private val wsServers = mutableMapOf<String, HttpServer>()
/**
* Sets an exception handler that will be called whenever an exception occurs during transport.
*/
fun exceptionHandler(handler: (Throwable) -> Unit) {
exceptionHandler = handler
}
/**
* Creates a new endpoint over http.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param requestURI the request URI path to match
* @param tls whether the endpoint should be secured using TLS
* @param handler function called when a message is received
*/
fun createHTTPEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9337,
requestURI: String? = null,
tls: Boolean = false,
handler: (Message) -> Unit,
portUpdateListener: (Int) -> Unit = {}
) {
checkNotStarted()
httpEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, handler, portUpdateListener)
}
/**
* Creates a new endpoint over tcp persistent connections.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param tls whether the endpoint should be secured using TLS
* @param handler function called when a message is received
*/
fun createTCPEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9237,
tls: Boolean = false,
handler: (Message) -> Unit,
portUpdateListener: (Int) -> Unit = {}
) {
checkNotStarted()
tcpEndpoints[id] = Endpoint(networkInterface, port, null, tls, handler, portUpdateListener)
}
/**
* Creates a new endpoint over UDP connections.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param tls whether the endpoint should be secured using TLS
* @param handler function called when a message is received
*/
fun createUDPEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9137,
handler: (Message) -> Unit
) {
checkNotStarted()
udpEndpoints[id] = Endpoint(networkInterface, port, null, false, handler)
}
/**
* Creates a new endpoint over websocket connections.
* @param networkInterface the network interface to bind the endpoint to
* @param port the port to serve traffic from
* @param requestURI the request URI path to match
* @param tls whether the endpoint should be secured using TLS
* @param handler function called when a message is received
*/
fun createWSEndpoint(
id: String = "default",
networkInterface: String = "0.0.0.0",
port: Int = 9037,
requestURI: String? = null,
tls: Boolean = false,
handler: (Message) -> Unit,
portUpdateListener: (Int) -> Unit = {}
) {
checkNotStarted()
wsEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, handler, portUpdateListener)
}
/**
* Sends a message using the transport specified.
*
*/
suspend fun sendMessage(message: Message, transport: Transport, host: String, port: Int, requestURI: String = "") {
checkStarted()
val completion = AsyncCompletion.incomplete()
when (transport) {
Transport.HTTP -> {
@Suppress("DEPRECATION")
val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI)
.exceptionHandler(exceptionHandler).handler {
if (it.statusCode() == 200) {
completion.complete()
} else {
completion.completeExceptionally(RuntimeException("${it.statusCode()}"))
}
}
req.end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
}
Transport.TCP -> {
tcpClient!!.connect(port, host) { res ->
if (res.failed()) {
completion.completeExceptionally(res.cause())
} else {
res.result().exceptionHandler(exceptionHandler).end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
completion.complete()
}
}
}
Transport.UDP -> {
udpClient!!.send(Buffer.buffer(message.toBytes().toArrayUnsafe()), port, host) { handler ->
if (handler.failed()) {
completion.completeExceptionally(handler.cause())
} else {
completion.complete()
}
}
}
Transport.WS -> {
httpClient!!.websocket(port, host, requestURI, { handler ->
handler.exceptionHandler(exceptionHandler)
.writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).end()
completion.complete()
},
{ exception ->
completion.completeExceptionally(exception)
})
}
}
completion.await()
}
private fun findEndpoint(endpointId: String): Endpoint? {
val uri = URI.create(endpointId)
if (uri.scheme == null) {
if (httpEndpoints.containsKey(endpointId)) {
return httpEndpoints[endpointId]
}
if (tcpEndpoints.containsKey(endpointId)) {
return tcpEndpoints[endpointId]
}
if (udpEndpoints.containsKey(endpointId)) {
return udpEndpoints[endpointId]
}
if (wsEndpoints.containsKey(endpointId)) {
return wsEndpoints[endpointId]
}
return null
}
when (uri.scheme) {
"http" -> {
return httpEndpoints[uri.host]
}
"tcp" -> {
return tcpEndpoints[uri.host]
}
"udp" -> {
return udpEndpoints[uri.host]
}
"ws" -> {
return wsEndpoints[uri.host]
}
else -> {
throw IllegalArgumentException("Unsupported endpoint $endpointId")
}
}
}
/**
* Starts the hobbits transport.
*/
suspend fun start() {
if (started.compareAndSet(false, true)) {
httpClient = vertx.createHttpClient()
tcpClient = vertx.createNetClient()
udpClient = vertx.createDatagramSocket().exceptionHandler(exceptionHandler)
val completions = mutableListOf<AsyncCompletion>()
for ((id, endpoint) in httpEndpoints) {
val completion = AsyncCompletion.incomplete()
val httpServer = vertx.createHttpServer()
httpServers[id] = httpServer
httpServer.requestHandler {
if (endpoint.requestURI == null || it.path().startsWith(endpoint.requestURI)) {
it.bodyHandler {
val bytes = Bytes.wrapBuffer(it)
val message = Message.readMessage(bytes)
if (message == null) {
incompleteMessageHandler(bytes)
} else {
endpoint.handler(message)
}
}
it.response().statusCode = 200
it.response().end()
} else {
it.response().statusCode = 404
it.response().end()
}
}.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
if (endpoint.port == 0) {
endpoint.updatePort(it.result().actualPort())
}
completion.complete()
}
}
completions.add(completion)
}
for ((id, endpoint) in tcpEndpoints) {
val completion = AsyncCompletion.incomplete()
val tcpServer = vertx.createNetServer()
tcpServers[id] = tcpServer
tcpServer.connectHandler { connectHandler -> connectHandler.handler { buffer ->
val bytes = Bytes.wrapBuffer(buffer)
val message = Message.readMessage(bytes)
if (message == null) {
incompleteMessageHandler(bytes)
} else {
endpoint.handler(message)
}
}
}.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
if (endpoint.port == 0) {
endpoint.updatePort(it.result().actualPort())
}
completion.complete()
}
}
completions.add(completion)
}
for ((id, endpoint) in udpEndpoints) {
val completion = AsyncCompletion.incomplete()
val udpServer = vertx.createDatagramSocket()
udpServers[id] = udpServer
udpServer.handler { packet ->
val bytes = Bytes.wrapBuffer(packet.data())
val message = Message.readMessage(bytes)
if (message == null) {
incompleteMessageHandler(bytes)
} else {
endpoint.handler(message)
}
}.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
completion.complete()
}
}
completions.add(completion)
}
for ((id, endpoint) in wsEndpoints) {
val completion = AsyncCompletion.incomplete()
val httpServer = vertx.createHttpServer()
wsServers[id] = httpServer
httpServer.websocketHandler {
if (endpoint.requestURI == null || it.path().startsWith(endpoint.requestURI)) {
it.accept()
it.binaryMessageHandler { buffer ->
try {
val bytes = Bytes.wrapBuffer(buffer)
val message = Message.readMessage(bytes)
if (message == null) {
incompleteMessageHandler(bytes)
} else {
endpoint.handler(message)
}
} finally {
it.end()
}
}
} else {
it.reject()
}
}.listen(endpoint.port, endpoint.networkInterface) {
if (it.failed()) {
completion.completeExceptionally(it.cause())
} else {
if (endpoint.port == 0) {
endpoint.updatePort(it.result().actualPort())
}
completion.complete()
}
}
completions.add(completion)
}
AsyncCompletion.allOf(completions).await()
}
}
/**
* Stops the hobbits transport.
*/
fun stop() {
if (started.compareAndSet(true, false)) {
httpClient!!.close()
tcpClient!!.close()
udpClient!!.close()
for (server in httpServers.values) {
server.close()
}
for (server in tcpServers.values) {
server.close()
}
for (server in udpServers.values) {
server.close()
}
}
}
private fun checkNotStarted() {
if (started.get()) {
throw IllegalStateException("Server already started")
}
}
private fun checkStarted() {
if (!started.get()) {
throw IllegalStateException("Server not started")
}
}
}
internal data class Endpoint(
val networkInterface: String,
var port: Int,
val requestURI: String?,
val tls: Boolean,
val handler: (Message) -> Unit,
val portUpdateListener: (Int) -> Unit = {}
) {
fun updatePort(newPort: Int) {
port = newPort
portUpdateListener(newPort)
}
}
/**
* Transport types supported.
*/
enum class Transport() {
HTTP,
TCP,
UDP,
WS
}