Add a proxy subprotocol
diff --git a/devp2p-proxy/build.gradle b/devp2p-proxy/build.gradle
new file mode 100644
index 0000000..f8e9919
--- /dev/null
+++ b/devp2p-proxy/build.gradle
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+description = 'Ethereum ÐΞVp2p Proxy subprotocol implementation.'
+
+dependencies {
+  implementation project(':bytes')
+  implementation project(':eth-repository')
+  implementation project(':concurrent')
+  implementation project(':concurrent-coroutines')
+  implementation project(':rlp')
+  implementation project(':rlpx')
+  implementation 'io.vertx:vertx-core'
+  implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
+  implementation 'org.slf4j:slf4j-api'
+
+  compileOnly 'org.bouncycastle:bcprov-jdk15on'
+
+  testImplementation project(':crypto')
+  testImplementation project(':junit')
+  testImplementation 'org.bouncycastle:bcprov-jdk15on'
+  testImplementation 'org.junit.jupiter:junit-jupiter-api'
+  testImplementation 'org.junit.jupiter:junit-jupiter-params'
+  testImplementation 'org.mockito:mockito-junit-jupiter'
+  testImplementation 'com.nhaarman.mockitokotlin2:mockito-kotlin'
+
+  testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+  testRuntimeOnly 'ch.qos.logback:logback-classic'
+}
diff --git a/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
new file mode 100644
index 0000000..0522264
--- /dev/null
+++ b/devp2p-proxy/src/integrationTest/kotlin/org/apache/tuweni/devp2p/proxy/SendDataToAnotherNodeTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.crypto.SECP256K1
+import org.apache.tuweni.junit.BouncyCastleExtension
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.apache.tuweni.rlpx.vertx.VertxRLPxService
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.net.InetSocketAddress
+
+@ExtendWith(VertxExtension::class, BouncyCastleExtension::class)
+class SendDataToAnotherNodeTest {
+
+  @Test
+  fun testConnectTwoProxyNodes(@VertxInstance vertx: Vertx) = runBlocking {
+    val service = VertxRLPxService(
+      vertx,
+      0,
+      "127.0.0.1",
+      0,
+      SECP256K1.KeyPair.random(),
+      listOf(
+        ProxySubprotocol()
+      ),
+      "Tuweni Experiment 0.1"
+    )
+
+    val service2kp = SECP256K1.KeyPair.random()
+    val service2 = VertxRLPxService(
+      vertx,
+      0,
+      "127.0.0.1",
+      0,
+      service2kp,
+      listOf(ProxySubprotocol()),
+      "Tuweni Experiment 0.1"
+    )
+    val recorder = RecordingClientHandler()
+    AsyncCompletion.allOf(service.start(), service2.start()).await()
+    val client = service.getClient(ProxySubprotocol.ID) as ProxyClient
+    client.registeredSites["datasink"] = recorder
+    service.connectTo(service2kp.publicKey(), InetSocketAddress("127.0.0.1", service2.actualPort())).await()
+
+    val client2 = service2.getClient(ProxySubprotocol.ID) as ProxyClient
+    client2.request("datasink", Bytes.wrap("Hello world".toByteArray()))
+    client2.request("datasink", Bytes.wrap("foo".toByteArray()))
+    client2.request("datasink", Bytes.wrap("foobar".toByteArray()))
+
+    assertEquals(recorder.messages[0], "Hello world")
+    assertEquals(recorder.messages[1], "foo")
+    assertEquals(recorder.messages[2], "foobar")
+  }
+}
+
+class RecordingClientHandler : ClientHandler {
+
+  val messages = mutableListOf<String>()
+
+  override suspend fun handleRequest(message: Bytes): Bytes {
+    messages.add(String(message.toArrayUnsafe()))
+    return Bytes.wrap("OK".toByteArray())
+  }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt
new file mode 100644
index 0000000..23bccc9
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyClient.kt
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncResult
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.devp2p.proxy.ProxySubprotocol.Companion.ID
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.rlpx.wire.WireConnection
+import java.lang.RuntimeException
+import java.util.UUID
+
+interface ClientHandler {
+  suspend fun handleRequest(message: Bytes): Bytes
+}
+
+class ProxyError(message: String?) : RuntimeException(message)
+
+class ProxyClient(private val service: SubprotocolService) : SubProtocolClient {
+
+  suspend fun request(site: String, message: Bytes): Bytes {
+    val messageId = UUID.randomUUID().toString()
+    var selectedConn: WireConnection? = null
+    for (conn in service.repository().asIterable()) {
+      val peerInfo = proxyPeerRepository.peers[conn.uri()]
+      if (peerInfo?.sites?.contains(site) == true) {
+        selectedConn = conn
+        break
+      }
+    }
+    if (selectedConn == null) {
+      throw ProxyError("No peer with site $site available")
+    }
+    val result = AsyncResult.incomplete<ResponseMessage>()
+    proxyPeerRepository.peers[selectedConn.uri()]?.pendingResponses?.put(messageId, result)
+    service.send(ID, REQUEST, selectedConn, RequestMessage(messageId, site, message).toRLP())
+    val response = result.await()
+    if (!response.success) {
+      throw ProxyError(String(response.message.toArrayUnsafe()))
+    }
+    return response.message
+  }
+
+  val registeredSites = mutableMapOf<String, ClientHandler>()
+  internal val proxyPeerRepository = ProxyPeerRepository()
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt
new file mode 100644
index 0000000..c98f899
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyHandler.kt
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.concurrent.AsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion
+import org.apache.tuweni.concurrent.CompletableAsyncResult
+import org.apache.tuweni.concurrent.coroutines.asyncCompletion
+import org.apache.tuweni.devp2p.proxy.ProxySubprotocol.Companion.ID
+import org.apache.tuweni.rlp.RLP
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.wire.DisconnectReason
+import org.apache.tuweni.rlpx.wire.SubProtocolHandler
+import org.apache.tuweni.rlpx.wire.WireConnection
+import org.slf4j.LoggerFactory
+import java.util.WeakHashMap
+import kotlin.coroutines.CoroutineContext
+
+const val STATUS = 0
+const val REQUEST = 1
+const val RESPONSE = 2
+
+class ProxyHandler(
+  override val coroutineContext: CoroutineContext = Dispatchers.Default,
+  val service: SubprotocolService,
+  val client: ProxyClient
+) : CoroutineScope, SubProtocolHandler {
+
+  companion object {
+    val logger = LoggerFactory.getLogger(ProxyHandler::class.java)
+  }
+
+  private val pendingStatus = WeakHashMap<String, PeerInfo>()
+
+  override fun handle(connection: WireConnection, messageType: Int, message: Bytes) = asyncCompletion {
+    when (messageType) {
+      STATUS -> handleStatus(message, connection)
+      REQUEST -> handleRequest(message, connection)
+      RESPONSE -> handleResponse(message, connection)
+      else -> {
+        logger.warn("Unknown message type {}", messageType)
+        service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
+      }
+    }
+  }
+
+  private fun handleResponse(message: Bytes, connection: WireConnection) {
+    val response = ResponseMessage.decode(message)
+    client.proxyPeerRepository.peers[connection.uri()]?.pendingResponses?.remove(response.id)?.complete(response)
+  }
+
+  private suspend fun handleRequest(message: Bytes, connection: WireConnection) {
+    val request = RequestMessage.decode(message)
+
+    val siteClient = client.registeredSites[request.site]
+    if (siteClient == null) {
+      val response =
+        ResponseMessage(request.id, request.site, false, Bytes.wrap("No site available ${request.site}".toByteArray()))
+      service.send(ID, RESPONSE, connection, response.toRLP())
+    } else {
+      try {
+        val responseContent = siteClient.handleRequest(request.message)
+        val response = ResponseMessage(request.id, request.site, true, responseContent)
+        service.send(ID, RESPONSE, connection, response.toRLP())
+      } catch (t: Throwable) {
+        val response =
+          ResponseMessage(request.id, request.site, false, Bytes.wrap("Internal server error".toByteArray()))
+        service.send(ID, RESPONSE, connection, response.toRLP())
+      }
+    }
+  }
+
+  private fun handleStatus(message: Bytes, connection: WireConnection) {
+    val status = StatusMessage.decode(message)
+    var peer = pendingStatus.remove(connection.uri())
+    if (peer == null) {
+      service.send(ID, STATUS, connection, StatusMessage(client.registeredSites.keys.toList()).toRLP())
+      peer = PeerInfo()
+    }
+    client.proxyPeerRepository.addPeer(connection.uri(), peer)
+    peer.connect(status.sites)
+  }
+
+  override fun handleNewPeerConnection(connection: WireConnection): AsyncCompletion {
+    val newPeer = PeerInfo()
+    pendingStatus[connection.uri()] = newPeer
+    service.send(ID, STATUS, connection, StatusMessage(client.registeredSites.keys.toList()).toRLP())
+
+    return newPeer.ready
+  }
+
+  override fun stop(): AsyncCompletion = AsyncCompletion.COMPLETED
+}
+
+internal class PeerInfo {
+
+  var sites: List<String>? = null
+  val ready: CompletableAsyncCompletion = AsyncCompletion.incomplete()
+  val pendingResponses = WeakHashMap<String, CompletableAsyncResult<ResponseMessage>>()
+
+  fun connect(sites: List<String>) {
+    this.sites = sites
+    ready.complete()
+  }
+
+  fun cancel() {
+    ready.cancel()
+  }
+}
+
+data class StatusMessage(val sites: List<String>) {
+
+  companion object {
+    fun decode(message: Bytes): StatusMessage = RLP.decodeList(message) {
+      val list = mutableListOf<String>()
+      while (!it.isComplete) {
+        list.add(it.readString())
+      }
+      StatusMessage(list)
+    }
+  }
+
+  fun toRLP(): Bytes = RLP.encodeList {
+    for (site in sites) {
+      it.writeString(site)
+    }
+  }
+}
+
+data class RequestMessage(val id: String, val site: String, val message: Bytes) {
+
+  companion object {
+    fun decode(message: Bytes): RequestMessage = RLP.decodeList(message) {
+      val id = it.readString()
+      val site = it.readString()
+      val request = it.readValue()
+      RequestMessage(id, site, request)
+    }
+  }
+
+  fun toRLP(): Bytes = RLP.encodeList {
+    it.writeString(id)
+    it.writeString(site)
+    it.writeValue(message)
+  }
+}
+
+data class ResponseMessage(val id: String, val site: String, val success: Boolean, val message: Bytes) {
+
+  companion object {
+    fun decode(message: Bytes): ResponseMessage = RLP.decodeList(message) {
+      val id = it.readString()
+      val site = it.readString()
+      val success = it.readByte() == 1.toByte()
+      val response = it.readValue()
+      ResponseMessage(id, site, success, response)
+    }
+  }
+
+  fun toRLP(): Bytes = RLP.encodeList {
+    it.writeString(id)
+    it.writeString(site)
+    it.writeByte(if (success) 1.toByte() else 0.toByte())
+    it.writeValue(message)
+  }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt
new file mode 100644
index 0000000..58945fe
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxyPeerRepository.kt
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+internal class ProxyPeerRepository {
+
+  val peers = mutableMapOf<String, PeerInfo>()
+
+  fun addPeer(id: String, peer: PeerInfo) {
+    peers[id] = peer
+  }
+}
diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
new file mode 100644
index 0000000..12f210e
--- /dev/null
+++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/ProxySubprotocol.kt
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.apache.tuweni.rlpx.RLPxService
+import org.apache.tuweni.rlpx.wire.SubProtocol
+import org.apache.tuweni.rlpx.wire.SubProtocolClient
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
+
+class ProxySubprotocol() : SubProtocol {
+
+  companion object {
+    val ID = SubProtocolIdentifier.of("proxy", 1)
+  }
+
+  override fun id() = ID
+
+  override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean = subProtocolIdentifier.equals(ID)
+
+  override fun versionRange(version: Int): Int = 3
+
+  override fun createHandler(service: RLPxService, client: SubProtocolClient) =
+    ProxyHandler(service = service, client = client as ProxyClient)
+
+  override fun createClient(service: RLPxService) = ProxyClient(service)
+}
diff --git a/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt
new file mode 100644
index 0000000..2f489d9
--- /dev/null
+++ b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/MessagesTest.kt
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class MessagesTest {
+
+  @Test
+  fun testStatusRoundtrip() {
+    val status = StatusMessage(listOf("foo", "bar"))
+    val recreated = StatusMessage.decode(status.toRLP())
+    assertEquals(status, recreated)
+  }
+}
diff --git a/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt
new file mode 100644
index 0000000..27bbac0
--- /dev/null
+++ b/devp2p-proxy/src/test/kotlin/org/apache/tuweni/devp2p/proxy/ProxyTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p.proxy
+
+import com.nhaarman.mockitokotlin2.mock
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.rlpx.MemoryWireConnectionsRepository
+import org.apache.tuweni.rlpx.SubprotocolService
+import org.apache.tuweni.rlpx.WireConnectionRepository
+import org.apache.tuweni.rlpx.wire.DisconnectReason
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier
+import org.apache.tuweni.rlpx.wire.WireConnection
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.`when`
+import org.mockito.Mockito.mock
+
+internal class SubProtocolServiceImpl() : SubprotocolService {
+
+  var handler: ProxyHandler? = null
+  private val connectionRepository = MemoryWireConnectionsRepository()
+
+  override fun send(
+    subProtocolIdentifier: SubProtocolIdentifier,
+    messageType: Int,
+    connection: WireConnection,
+    message: Bytes
+  ) {
+    handler?.handle(connection, messageType, message)
+  }
+
+  override fun disconnect(connection: WireConnection, reason: DisconnectReason) {
+    TODO("Not yet implemented")
+  }
+
+  override fun repository(): WireConnectionRepository = connectionRepository
+}
+
+class SimpleClientHandler() : ClientHandler {
+  override suspend fun handleRequest(message: Bytes): Bytes {
+    return Bytes.wrap(" World".toByteArray())
+  }
+}
+
+class ProxyTest {
+
+  @Test
+  fun testTwoProxies() = runBlocking {
+    val subProtocolService = SubProtocolServiceImpl()
+    val subProtocolService2 = SubProtocolServiceImpl()
+    val proxyClient = ProxyClient(subProtocolService)
+    val proxyClient2 = ProxyClient(subProtocolService2)
+
+    val handler = ProxyHandler(service = subProtocolService, client = proxyClient)
+    val handler2 = ProxyHandler(service = subProtocolService2, client = proxyClient2)
+    subProtocolService.handler = handler2
+    subProtocolService2.handler = handler
+
+    val fooPeerInfo = PeerInfo()
+    fooPeerInfo.sites = listOf("web")
+    proxyClient2.proxyPeerRepository.addPeer("foo", fooPeerInfo)
+    proxyClient.registeredSites["web"] = SimpleClientHandler()
+
+    val wireConnection = mock<WireConnection>()
+    `when`(wireConnection.uri()).thenReturn("foo")
+
+    subProtocolService2.repository().add(wireConnection)
+    val response = proxyClient2.request("web", Bytes.wrap("Hello".toByteArray()))
+    assertEquals(" World", String(response.toArrayUnsafe()))
+  }
+}
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
index 245b056..92fc21c 100644
--- a/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/RLPxService.java
@@ -12,11 +12,9 @@
  */
 package org.apache.tuweni.rlpx;
 
-import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.SECP256K1;
-import org.apache.tuweni.rlpx.wire.DisconnectReason;
 import org.apache.tuweni.rlpx.wire.SubProtocolClient;
 import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
 import org.apache.tuweni.rlpx.wire.WireConnection;
@@ -26,7 +24,7 @@
 /**
  * Service allowing connections to remote peers over RLPx connections.
  */
-public interface RLPxService {
+public interface RLPxService extends SubprotocolService {
 
   /**
    * Connects to a remote peer.
@@ -83,31 +81,6 @@
   AsyncCompletion stop();
 
   /**
-   * Sends a wire message to a peer.
-   *
-   * @param subProtocolIdentifier the identifier of the subprotocol this message is part of
-   * @param messageType the type of the message according to the subprotocol
-   * @param connection the connection.
-   * @param message the message, addressed to a connection.
-   */
-  void send(SubProtocolIdentifier subProtocolIdentifier, int messageType, WireConnection connection, Bytes message);
-
-  /**
-   * Sends a message to the peer explaining that we are about to disconnect.
-   *
-   * @param connection the connection to target
-   * @param reason the reason for disconnection
-   */
-  void disconnect(WireConnection connection, DisconnectReason reason);
-
-  /**
-   * Gets the wire connections repository associated with this service.
-   *
-   * @return the repository of wire connections associated with this service.
-   */
-  WireConnectionRepository repository();
-
-  /**
    * Gets a subprotocol client associated with the given subprotocol.
    * 
    * @param subProtocolIdentifier the subprotocol identifier
diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java
new file mode 100644
index 0000000..6c0db7b
--- /dev/null
+++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/SubprotocolService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.rlpx;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.rlpx.wire.DisconnectReason;
+import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier;
+import org.apache.tuweni.rlpx.wire.WireConnection;
+
+public interface SubprotocolService {
+  /**
+   * Sends a wire message to a peer.
+   *
+   * @param subProtocolIdentifier the identifier of the subprotocol this message is part of
+   * @param messageType the type of the message according to the subprotocol
+   * @param connection the connection.
+   * @param message the message, addressed to a connection.
+   */
+  void send(SubProtocolIdentifier subProtocolIdentifier, int messageType, WireConnection connection, Bytes message);
+
+  /**
+   * Sends a message to the peer explaining that we are about to disconnect.
+   *
+   * @param connection the connection to target
+   * @param reason the reason for disconnection
+   */
+  void disconnect(WireConnection connection, DisconnectReason reason);
+
+  /**
+   * Gets the wire connections repository associated with this service.
+   *
+   * @return the repository of wire connections associated with this service.
+   */
+  WireConnectionRepository repository();
+}
diff --git a/settings.gradle b/settings.gradle
index 18afa97..fbd5657 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,6 +18,7 @@
 include 'crypto'
 include 'devp2p'
 include 'devp2p-eth'
+include 'devp2p-proxy'
 include 'dist'
 include 'dns-discovery'
 include 'eth'