Add a proxy subprotocol
diff --git a/devp2p-proxy/build.gradle b/devp2p-proxy/build.gradle
new file mode 100644
index 0000000..f8e9919
--- /dev/null
+++ b/devp2p-proxy/build.gradle
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+description = 'Ethereum ÐΞVp2p Proxy subprotocol implementation.'
+
+dependencies {
+ implementation project(':bytes')
+ implementation project(':eth-repository')
+ implementation project(':concurrent')
+ implementation project(':concurrent-coroutines')
+ implementation project(':rlp')
+ implementation project(':rlpx')
+ implementation 'io.vertx:vertx-core'
+ implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
+ implementation 'org.slf4j:slf4j-api'
+
+ compileOnly 'org.bouncycastle:bcprov-jdk15on'
+
+ testImplementation project(':crypto')
+ testImplementation project(':junit')
+ testImplementation 'org.bouncycastle:bcprov-jdk15on'
+ testImplementation 'org.junit.jupiter:junit-jupiter-api'
+ testImplementation 'org.junit.jupiter:junit-jupiter-params'
+ testImplementation 'org.mockito:mockito-junit-jupiter'
+ testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin'
+
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+ testRuntimeOnly 'ch.qos.logback:logback-classic'
+}
diff --git a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
new file mode 100644
index 0000000..0522264
--- /dev/null
+++ b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.apache.tuweni.rlpx.vertx.VertxRLPxService
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetSocketAddress
+
+@ExtendWith(VertxExtension::class, BouncyCastleExtension::class)
+class SendDataToAnotherNodeTest {
+
+ @Test
+ fun testConnectTwoProxyNodes(@VertxInstance vertx: Vertx) = runBlocking {
+ val service = VertxRLPxService(
+ vertx,
+ 0,
+ "127.0.0.1",
+ 0,
+ SECP256K1.KeyPair.random(),
+ listOf(
+ ProxySubprotocol()
+ ),
+ "Tuweni Experiment 0.1"
+ )
+
+ val service2kp = SECP256K1.KeyPair.random()
+ val service2 = VertxRLPxService(
+ vertx,
+ 0,
+ "127.0.0.1",
+ 0,
+ service2kp,
+ listOf(ProxySubprotocol()),
+ "Tuweni Experiment 0.1"
+ )
+ val recorder = RecordingClientHandler()
+ AsyncCompletion.allOf(service.start(), service2.start()).await()
+ val client = service.getClient(ProxySubprotocol.ID) as ProxyClient
+ client.registeredSites["datasink"] = recorder
+ service.connectTo(service2kp.publicKey(), InetSocketAddress("127.0.0.1", service2.actualPort())).await()
+
+ val client2 = service2.getClient(ProxySubprotocol.ID) as ProxyClient
+ client2.request("datasink", Bytes.wrap("Hello world".toByteArray()))
+ client2.request("datasink", Bytes.wrap("foo".toByteArray()))
+ client2.request("datasink", Bytes.wrap("foobar".toByteArray()))
+
+ assertEquals(recorder.messages[0], "Hello world")
+ assertEquals(recorder.messages[1], "foo")
+ assertEquals(recorder.messages[2], "foobar")
+ }
+}
+
+class RecordingClientHandler : ClientHandler {
+
+ val messages = mutableListOf<String>()
+
+ override suspend fun handleRequest(message: Bytes): Bytes {
+ messages.add(String(message.toArrayUnsafe()))
+ return Bytes.wrap("OK".toByteArray())
+ }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt
new file mode 100644
index 0000000..23bccc9
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.devp2p.proxy.ProxySubprotocol.Companion.ID
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.rlpx.wire.WireConnection
+import java.lang.RuntimeException
+import java.util.UUID
+
+interface ClientHandler {
+ suspend fun handleRequest(message: Bytes): Bytes
+}
+
+class ProxyError(message: String?) : RuntimeException(message)
+
+class ProxyClient(private val service: SubprotocolService) : SubProtocolClient {
+
+ suspend fun request(site: String, message: Bytes): Bytes {
+ val messageId = UUID.randomUUID().toString()
+ var selectedConn: WireConnection? = null
+ for (conn in service.repository().asIterable()) {
+ val peerInfo = proxyPeerRepository.peers[conn.uri()]
+ if (peerInfo?.sites?.contains(site) == true) {
+ selectedConn = conn
+ break
+ }
+ }
+ if (selectedConn == null) {
+ throw ProxyError("No peer with site $site available")
+ }
+ val result = AsyncResult.incomplete<ResponseMessage>()
+ proxyPeerRepository.peers[selectedConn.uri()]?.pendingResponses?.put(messageId, result)
+ service.send(ID, REQUEST, selectedConn, RequestMessage(messageId, site, message).toRLP())
+ val response = result.await()
+ if (!response.success) {
+ throw ProxyError(String(response.message.toArrayUnsafe()))
+ }
+ return response.message
+ }
+
+ val registeredSites = mutableMapOf<String, ClientHandler>()
+ internal val proxyPeerRepository = ProxyPeerRepository()
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt
new file mode 100644
index 0000000..c98f899
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncResult
+import org.apache.tuweni.concurrent.coroutines.asyncCompletion
+import org.apache.tuweni.devp2p.proxy.ProxySubprotocol.Companion.ID
+import org.apache.tuweni.rlp.RLP
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.wire.DisconnectReason
+import org.apache.tuweni.rlpx.wire.SubProtocolHandler
+import org.apache.tuweni.rlpx.wire.WireConnection
+import org.slf4j.LoggerFactory
+import java.util.WeakHashMap
+import kotlin.coroutines.CoroutineContext
+
+const val STATUS = 0
+const val REQUEST = 1
+const val RESPONSE = 2
+
+class ProxyHandler(
+ override val coroutineContext: CoroutineContext = Dispatchers.Default,
+ val service: SubprotocolService,
+ val client: ProxyClient
+) : CoroutineScope, SubProtocolHandler {
+
+ companion object {
+ val logger = LoggerFactory.getLogger(ProxyHandler::class.java)
+ }
+
+ private val pendingStatus = WeakHashMap<String, PeerInfo>()
+
+ override fun handle(connection: WireConnection, messageType: Int, message: Bytes) = asyncCompletion {
+ when (messageType) {
+ STATUS -> handleStatus(message, connection)
+ REQUEST -> handleRequest(message, connection)
+ RESPONSE -> handleResponse(message, connection)
+ else -> {
+ logger.warn("Unknown message type {}", messageType)
+ service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+ }
+ }
+ }
+
+ private fun handleResponse(message: Bytes, connection: WireConnection) {
+ val response = ResponseMessage.decode(message)
+ client.proxyPeerRepository.peers[connection.uri()]?.pendingResponses?.remove(response.id)?.complete(response)
+ }
+
+ private suspend fun handleRequest(message: Bytes, connection: WireConnection) {
+ val request = RequestMessage.decode(message)
+
+ val siteClient = client.registeredSites[request.site]
+ if (siteClient == null) {
+ val response =
+ ResponseMessage(request.id, request.site, false, Bytes.wrap("No site available ${request.site}".toByteArray()))
+ service.send(ID, RESPONSE, connection, response.toRLP())
+ } else {
+ try {
+ val responseContent = siteClient.handleRequest(request.message)
+ val response = ResponseMessage(request.id, request.site, true, responseContent)
+ service.send(ID, RESPONSE, connection, response.toRLP())
+ } catch (t: Throwable) {
+ val response =
+ ResponseMessage(request.id, request.site, false, Bytes.wrap("Internal server error".toByteArray()))
+ service.send(ID, RESPONSE, connection, response.toRLP())
+ }
+ }
+ }
+
+ private fun handleStatus(message: Bytes, connection: WireConnection) {
+ val status = StatusMessage.decode(message)
+ var peer = pendingStatus.remove(connection.uri())
+ if (peer == null) {
+ service.send(ID, STATUS, connection, StatusMessage(client.registeredSites.keys.toList()).toRLP())
+ peer = PeerInfo()
+ }
+ client.proxyPeerRepository.addPeer(connection.uri(), peer)
+ peer.connect(status.sites)
+ }
+
+ override fun handleNewPeerConnection(connection: WireConnection): AsyncCompletion {
+ val newPeer = PeerInfo()
+ pendingStatus[connection.uri()] = newPeer
+ service.send(ID, STATUS, connection, StatusMessage(client.registeredSites.keys.toList()).toRLP())
+
+ return newPeer.ready
+ }
+
+ override fun stop(): AsyncCompletion = AsyncCompletion.COMPLETED
+}
+
+internal class PeerInfo {
+
+ var sites: List<String>? = null
+ val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
+ val pendingResponses = WeakHashMap<String, CompletableAsyncResult<ResponseMessage>>()
+
+ fun connect(sites: List<String>) {
+ this.sites = sites
+ ready.complete()
+ }
+
+ fun cancel() {
+ ready.cancel()
+ }
+}
+
+data class StatusMessage(val sites: List<String>) {
+
+ companion object {
+ fun decode(message: Bytes): StatusMessage = RLP.decodeList(message) {
+ val list = mutableListOf<String>()
+ while (!it.isComplete) {
+ list.add(it.readString())
+ }
+ StatusMessage(list)
+ }
+ }
+
+ fun toRLP(): Bytes = RLP.encodeList {
+ for (site in sites) {
+ it.writeString(site)
+ }
+ }
+}
+
+data class RequestMessage(val id: String, val site: String, val message: Bytes) {
+
+ companion object {
+ fun decode(message: Bytes): RequestMessage = RLP.decodeList(message) {
+ val id = it.readString()
+ val site = it.readString()
+ val request = it.readValue()
+ RequestMessage(id, site, request)
+ }
+ }
+
+ fun toRLP(): Bytes = RLP.encodeList {
+ it.writeString(id)
+ it.writeString(site)
+ it.writeValue(message)
+ }
+}
+
+data class ResponseMessage(val id: String, val site: String, val success: Boolean, val message: Bytes) {
+
+ companion object {
+ fun decode(message: Bytes): ResponseMessage = RLP.decodeList(message) {
+ val id = it.readString()
+ val site = it.readString()
+ val success = it.readByte() == 1.toByte()
+ val response = it.readValue()
+ ResponseMessage(id, site, success, response)
+ }
+ }
+
+ fun toRLP(): Bytes = RLP.encodeList {
+ it.writeString(id)
+ it.writeString(site)
+ it.writeByte(if (success) 1.toByte() else 0.toByte())
+ it.writeValue(message)
+ }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt
new file mode 100644
index 0000000..58945fe
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+internal class ProxyPeerRepository {
+
+ val peers = mutableMapOf<String, PeerInfo>()
+
+ fun addPeer(id: String, peer: PeerInfo) {
+ peers[id] = peer
+ }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
new file mode 100644
index 0000000..12f210e
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
+
+class ProxySubprotocol() : SubProtocol {
+
+ companion object {
+ val ID = SubProtocolIdentifier.of("proxy", 1)
+ }
+
+ override fun id() = ID
+
+ override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean = subProtocolIdentifier.equals(ID)
+
+ override fun versionRange(version: Int): Int = 3
+
+ override fun createHandler(service: RLPxService, client: SubProtocolClient) =
+ ProxyHandler(service = service, client = client as ProxyClient)
+
+ override fun createClient(service: RLPxService) = ProxyClient(service)
+}
diff --git a/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt
new file mode 100644
index 0000000..2f489d9
--- /dev/null
+++ b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class MessagesTest {
+
+ @Test
+ fun testStatusRoundtrip() {
+ val status = StatusMessage(listOf("foo", "bar"))
+ val recreated = StatusMessage.decode(status.toRLP())
+ assertEquals(status, recreated)
+ }
+}
diff --git a/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt
new file mode 100644
index 0000000..27bbac0
--- /dev/null
+++ b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import com.nhaarman.mockitokotlin2.mock
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.rlpx.MemoryWireConnectionsRepository
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.WireConnectionRepository
+import org.apache.tuweni.rlpx.wire.DisconnectReason
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
+import org.apache.tuweni.rlpx.wire.WireConnection
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.`when`
+import org.mockito.Mockito.mock
+
+internal class SubProtocolServiceImpl() : SubprotocolService {
+
+ var handler: ProxyHandler? = null
+ private val connectionRepository = MemoryWireConnectionsRepository()
+
+ override fun send(
+ subProtocolIdentifier: SubProtocolIdentifier,
+ messageType: Int,
+ connection: WireConnection,
+ message: Bytes
+ ) {
+ handler?.handle(connection, messageType, message)
+ }
+
+ override fun disconnect(connection: WireConnection, reason: DisconnectReason) {
+ TODO("Not yet implemented")
+ }
+
+ override fun repository(): WireConnectionRepository = connectionRepository
+}
+
+class SimpleClientHandler() : ClientHandler {
+ override suspend fun handleRequest(message: Bytes): Bytes {
+ return Bytes.wrap(" World".toByteArray())
+ }
+}
+
+class ProxyTest {
+
+ @Test
+ fun testTwoProxies() = runBlocking {
+ val subProtocolService = SubProtocolServiceImpl()
+ val subProtocolService2 = SubProtocolServiceImpl()
+ val proxyClient = ProxyClient(subProtocolService)
+ val proxyClient2 = ProxyClient(subProtocolService2)
+
+ val handler = ProxyHandler(service = subProtocolService, client = proxyClient)
+ val handler2 = ProxyHandler(service = subProtocolService2, client = proxyClient2)
+ subProtocolService.handler = handler2
+ subProtocolService2.handler = handler
+
+ val fooPeerInfo = PeerInfo()
+ fooPeerInfo.sites = listOf("web")
+ proxyClient2.proxyPeerRepository.addPeer("foo", fooPeerInfo)
+ proxyClient.registeredSites["web"] = SimpleClientHandler()
+
+ val wireConnection = mock<WireConnection>()
+ `when`(wireConnection.uri()).thenReturn("foo")
+
+ subProtocolService2.repository().add(wireConnection)
+ val response = proxyClient2.request("web", Bytes.wrap("Hello".toByteArray()))
+ assertEquals(" World", String(response.toArrayUnsafe()))
+ }
+}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index 245b056..92fc21c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -12,11 +12,9 @@
*/
package org.apache.tuweni.rlpx;
-import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.crypto.SECP256K1;
-import org.apache.tuweni.rlpx.wire.DisconnectReason;
import org.apache.tuweni.rlpx.wire.SubProtocolClient;
import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
import org.apache.tuweni.rlpx.wire.WireConnection;
@@ -26,7 +24,7 @@
/**
* Service allowing connections to remote peers over RLPx connections.
*/
-public interface RLPxService {
+public interface RLPxService extends SubprotocolService {
/**
* Connects to a remote peer.
@@ -83,31 +81,6 @@
AsyncCompletion stop();
/**
- * Sends a wire message to a peer.
- *
- * @param subProtocolIdentifier the identifier of the subprotocol this message is part of
- * @param messageType the type of the message according to the subprotocol
- * @param connection the connection.
- * @param message the message, addressed to a connection.
- */
- void send(SubProtocolIdentifier subProtocolIdentifier, int messageType, WireConnection connection, Bytes message);
-
- /**
- * Sends a message to the peer explaining that we are about to disconnect.
- *
- * @param connection the connection to target
- * @param reason the reason for disconnection
- */
- void disconnect(WireConnection connection, DisconnectReason reason);
-
- /**
- * Gets the wire connections repository associated with this service.
- *
- * @return the repository of wire connections associated with this service.
- */
- WireConnectionRepository repository();
-
- /**
* Gets a subprotocol client associated with the given subprotocol.
*
* @param subProtocolIdentifier the subprotocol identifier
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java
new file mode 100644
index 0000000..6c0db7b
--- /dev/null
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.rlpx;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.rlpx.wire.DisconnectReason;
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
+import org.apache.tuweni.rlpx.wire.WireConnection;
+
+public interface SubprotocolService {
+ /**
+ * Sends a wire message to a peer.
+ *
+ * @param subProtocolIdentifier the identifier of the subprotocol this message is part of
+ * @param messageType the type of the message according to the subprotocol
+ * @param connection the connection.
+ * @param message the message, addressed to a connection.
+ */
+ void send(SubProtocolIdentifier subProtocolIdentifier, int messageType, WireConnection connection, Bytes message);
+
+ /**
+ * Sends a message to the peer explaining that we are about to disconnect.
+ *
+ * @param connection the connection to target
+ * @param reason the reason for disconnection
+ */
+ void disconnect(WireConnection connection, DisconnectReason reason);
+
+ /**
+ * Gets the wire connections repository associated with this service.
+ *
+ * @return the repository of wire connections associated with this service.
+ */
+ WireConnectionRepository repository();
+}
diff --git a/settings.gradle b/settings.gradle
index 18afa97..fbd5657 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,6 +18,7 @@
include 'crypto'
include 'devp2p'
include 'devp2p-eth'
+include 'devp2p-proxy'
include 'dist'
include 'dns-discovery'
include 'eth'