[FLINK-23711] Add an asynchronous HTTP transport

This commits adds a new transport based on a shaded version of Netty.
This transport is currently opt-in, and can be enabled by adding to
the endpoint definition:
...
transport:
  type: io.statefun.transports.v1/async
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index 50e1028..7d5db8e 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -32,6 +32,7 @@
     <properties>
         <okhttp.version>3.14.6</okhttp.version>
         <additional-sources.dir>target/additional-sources</additional-sources.dir>
+        <flink-shaded-netty.version>4.1.49.Final-13.0</flink-shaded-netty.version>
     </properties>
 
     <dependencies>
@@ -91,6 +92,12 @@
             <version>${unixsocket.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-netty</artifactId>
+            <version>${flink-shaded-netty.version}</version>
+        </dependency>
+
         <!-- tests -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java
new file mode 100644
index 0000000..8333a29
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+final class ChannelAttributes {
+
+  private ChannelAttributes() {}
+
+  static final AttributeKey<Boolean> EXPIRED =
+      AttributeKey.valueOf("org.apache.flink.statefun.flink.core.nettyclient.ExpiredKey");
+  static final AttributeKey<Boolean> ACQUIRED =
+      AttributeKey.valueOf("org.apache.flink.statefun.flink.core.nettyclient.AcquiredKey");
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java
new file mode 100644
index 0000000..4fb5c07
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+final class Endpoint {
+  private final String queryPath;
+  private final InetSocketAddress serviceAddress;
+  private final boolean useTls;
+
+  Endpoint(URI endpointUrl) {
+    requireValidEndpointUri(endpointUrl);
+    this.useTls = endpointUrl.getScheme().equalsIgnoreCase("https");
+    this.queryPath = Endpoint.computeQueryPath(endpointUrl);
+    this.serviceAddress =
+        InetSocketAddress.createUnresolved(endpointUrl.getHost(), endpointPort(endpointUrl));
+  }
+
+  public String queryPath() {
+    return queryPath;
+  }
+
+  public InetSocketAddress serviceAddress() {
+    return serviceAddress;
+  }
+
+  public boolean useTls() {
+    return useTls;
+  }
+
+  private static int endpointPort(URI endpoint) {
+    int port = endpoint.getPort();
+    if (port > 0) {
+      return port;
+    }
+    if (endpoint.getScheme().equalsIgnoreCase("https")) {
+      return 443;
+    }
+    return 80;
+  }
+
+  private static String computeQueryPath(URI endpoint) {
+    String uri = endpoint.getPath();
+    if (uri == null || uri.isEmpty()) {
+      uri = "/";
+    }
+    String query = endpoint.getQuery();
+    if (query != null) {
+      uri += "?" + query;
+    }
+    String fragment = endpoint.getFragment();
+    if (fragment != null) {
+      uri += "#" + fragment;
+    }
+    return uri;
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  private static void requireValidEndpointUri(URI endpointUrl) {
+    try {
+      endpointUrl.parseServerAuthority();
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java
new file mode 100644
index 0000000..1532789
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslCloseCompletionEvent;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+
+/**
+ * An Handler that we add to the channel pipeline that makes sure that the channel is: 1) does not
+ * stick a round for a long time (if {@code connectionTtlMs > 0}. 2) if this channel uses TLS, and a
+ * {@code SslCloseCompletionEvent} event recieved, this channel will be closed.
+ */
+final class HttpConnectionPoolHandler extends ChannelDuplexHandler {
+  private final long ttlMs;
+  @Nullable private ScheduledFuture<?> timer;
+
+  HttpConnectionPoolHandler(long connectionTtlMs) {
+    this.ttlMs = connectionTtlMs;
+  }
+
+  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+    this.initialize(ctx);
+    super.channelRegistered(ctx);
+  }
+
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    initialize(ctx);
+    super.channelActive(ctx);
+  }
+
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    initialize(ctx);
+    super.handlerAdded(ctx);
+  }
+
+  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+    destroy();
+    super.handlerRemoved(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    destroy();
+    super.channelInactive(ctx);
+  }
+
+  private void initialize(ChannelHandlerContext ctx) {
+    if (ttlMs <= 0) {
+      return;
+    }
+    if (timer != null) {
+      return;
+    }
+    long channelTimeToLive = ttlMs + positiveRandomJitterMillis();
+    timer =
+        ctx.channel()
+            .eventLoop()
+            .schedule(() -> tryExpire(ctx, false), channelTimeToLive, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+    if (!(evt instanceof SslCloseCompletionEvent)) {
+      return;
+    }
+    tryExpire(ctx, true);
+  }
+
+  private void destroy() {
+    if (timer != null) {
+      timer.cancel(false);
+      timer = null;
+    }
+  }
+
+  private void tryExpire(ChannelHandlerContext ctx, boolean shouldCancelTimer) {
+    if (shouldCancelTimer && timer != null) {
+      timer.cancel(false);
+    }
+    timer = null;
+    Channel channel = ctx.channel();
+    channel.attr(ChannelAttributes.EXPIRED).set(TRUE);
+    if (channel.isActive() && channel.attr(ChannelAttributes.ACQUIRED).get() == FALSE) {
+      // this channel is sitting all idly in the connection pool, unsuspecting of whats to come.
+      // we close it, but leave it in the pool, as the pool doesn't offer
+      // an API to remove an arbitrary connection. Eventually an health check will detect this, and
+      // remove it.
+      channel.close();
+    }
+  }
+
+  /** Compute a random delay between 1 and 3 seconds. */
+  private static int positiveRandomJitterMillis() {
+    return ThreadLocalRandom.current().nextInt(1_000, 3_000);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
new file mode 100644
index 0000000..d3d02a8
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelPoolHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContentDecompressor;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+final class HttpConnectionPoolManager implements ChannelPoolHandler {
+  private final NettyRequestReplySpec spec;
+  private final SslContext sslContext;
+  private final String peerHost;
+  private final int peerPort;
+
+  public HttpConnectionPoolManager(
+      @Nullable SslContext sslContext, NettyRequestReplySpec spec, String peerHost, int peerPort) {
+    this.spec = Objects.requireNonNull(spec);
+    this.peerHost = Objects.requireNonNull(peerHost);
+    this.sslContext = sslContext;
+    this.peerPort = peerPort;
+  }
+
+  @Override
+  public void channelAcquired(Channel channel) {
+    channel.attr(ChannelAttributes.ACQUIRED).set(Boolean.TRUE);
+  }
+
+  @Override
+  public void channelReleased(Channel channel) {
+    channel.attr(ChannelAttributes.ACQUIRED).set(Boolean.FALSE);
+    NettyRequestReplyHandler handler = channel.pipeline().get(NettyRequestReplyHandler.class);
+    handler.onReleaseToPool();
+  }
+
+  @Override
+  public void channelCreated(Channel channel) {
+    ChannelPipeline p = channel.pipeline();
+    if (sslContext != null) {
+      SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peerHost, peerPort);
+      p.addLast(sslHandler);
+    }
+    p.addLast(new HttpClientCodec());
+    p.addLast(new HttpContentDecompressor(true));
+    p.addLast(new HttpObjectAggregator(spec.maxRequestOrResponseSizeInBytes, true));
+    p.addLast(new NettyRequestReplyHandler());
+
+    long channelTimeToLiveMillis = spec.pooledConnectionTTL.toMillis();
+    p.addLast(new HttpConnectionPoolHandler(channelTimeToLiveMillis));
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
new file mode 100644
index 0000000..3cfa652
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+
+import java.io.Closeable;
+import java.net.URI;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelHealthChecker;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelPoolHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.FixedChannelPool;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+final class NettyClient implements RequestReplyClient, NettyClientService {
+  private final NettySharedResources shared;
+  private final FixedChannelPool pool;
+  private final Endpoint endpoint;
+  private final ReadOnlyHttpHeaders headers;
+  private final long totalRequestBudgetInNanos;
+  private final EventLoop eventLoop;
+
+  public static NettyClient from(
+      NettySharedResources shared, NettyRequestReplySpec spec, URI endpointUrl) {
+    Endpoint endpoint = new Endpoint(endpointUrl);
+    long totalRequestBudgetInNanos = spec.callTimeout.toNanos();
+    ReadOnlyHttpHeaders headers = NettyHeaders.defaultHeadersFor(endpoint.serviceAddress());
+    // prepare a customized bootstrap for this specific spec.
+    // this bootstrap reuses the select loop and io threads as other endpoints.
+    Bootstrap bootstrap = shared.bootstrap().clone();
+    bootstrap.option(CONNECT_TIMEOUT_MILLIS, (int) spec.connectTimeout.toMillis());
+    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+    bootstrap.remoteAddress(endpoint.serviceAddress());
+    // setup tls
+    @Nullable final SslContext sslContext;
+    if (endpoint.useTls()) {
+      sslContext = shared.sslContext();
+    } else {
+      sslContext = null;
+    }
+    // setup a channel pool handler
+    ChannelPoolHandler poolHandler =
+        new HttpConnectionPoolManager(
+            sslContext,
+            spec,
+            endpoint.serviceAddress().getHostString(),
+            endpoint.serviceAddress().getPort());
+    // setup a fixed capacity channel pool
+    FixedChannelPool pool =
+        new FixedChannelPool(
+            bootstrap,
+            poolHandler,
+            ChannelHealthChecker.ACTIVE,
+            FixedChannelPool.AcquireTimeoutAction.FAIL,
+            spec.connectTimeout.toMillis(),
+            spec.connectionPoolMaxSize,
+            2147483647,
+            true,
+            true);
+    shared.registerClosable(pool::closeAsync);
+    // use a dedicated, event loop to execute timers and tasks. An event loop is backed by a single
+    // thread.
+    EventLoop eventLoop = bootstrap.config().group().next();
+    return new NettyClient(shared, eventLoop, pool, endpoint, headers, totalRequestBudgetInNanos);
+  }
+
+  private NettyClient(
+      NettySharedResources shared,
+      EventLoop anEventLoop,
+      FixedChannelPool pool,
+      Endpoint endpoint,
+      ReadOnlyHttpHeaders defaultHttpHeaders,
+      long totalRequestBudgetInNanos) {
+    this.shared = Objects.requireNonNull(shared);
+    this.eventLoop = Objects.requireNonNull(anEventLoop);
+    this.pool = Objects.requireNonNull(pool);
+    this.endpoint = Objects.requireNonNull(endpoint);
+    this.headers = Objects.requireNonNull(defaultHttpHeaders);
+    this.totalRequestBudgetInNanos = totalRequestBudgetInNanos;
+  }
+
+  @Override
+  public CompletableFuture<FromFunction> call(
+      ToFunctionRequestSummary requestSummary,
+      RemoteInvocationMetrics metrics,
+      ToFunction toFunction) {
+    NettyRequest request = new NettyRequest(this, metrics, requestSummary, toFunction);
+    return request.start();
+  }
+
+  // -------------------------------------------------------------------------------------
+  // The following methods are used by NettyRequest during the various attempts
+  // -------------------------------------------------------------------------------------
+
+  @Override
+  public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+    pool.acquire()
+        .addListener(
+            future -> {
+              Throwable cause = future.cause();
+              if (cause != null) {
+                consumer.accept(null, cause);
+              } else {
+                Channel ch = (Channel) future.getNow();
+                consumer.accept(ch, null);
+              }
+            });
+  }
+
+  @Override
+  public void releaseChannel(Channel channel) {
+    EventLoop chEventLoop = channel.eventLoop();
+    if (chEventLoop.inEventLoop()) {
+      releaseChannel0(channel);
+    } else {
+      chEventLoop.execute(() -> releaseChannel0(channel));
+    }
+  }
+
+  @Override
+  public String queryPath() {
+    return endpoint.queryPath();
+  }
+
+  @Override
+  public ReadOnlyHttpHeaders headers() {
+    return headers;
+  }
+
+  @Override
+  public long totalRequestBudgetInNanos() {
+    return totalRequestBudgetInNanos;
+  }
+
+  @Override
+  public Closeable newTimeout(Runnable client, long delayInNanos) {
+    ScheduledFuture<?> future = eventLoop.schedule(client, delayInNanos, TimeUnit.NANOSECONDS);
+    return () -> future.cancel(false);
+  }
+
+  @Override
+  public void runOnEventLoop(Runnable task) {
+    Objects.requireNonNull(task);
+    if (eventLoop.inEventLoop()) {
+      task.run();
+    } else {
+      eventLoop.execute(task);
+    }
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return shared.isShutdown();
+  }
+
+  @Override
+  public long systemNanoTime() {
+    return System.nanoTime();
+  }
+
+  @Override
+  public <T> void writeAndFlush(T what, Channel where, BiConsumer<Void, Throwable> andThen) {
+    where
+        .writeAndFlush(what)
+        .addListener(
+            future -> {
+              Throwable cause = future.cause();
+              andThen.accept(null, cause);
+            });
+  }
+
+  private void releaseChannel0(Channel channel) {
+    if (!channel.isActive()) {
+      // We still need to return this channel to the pool, because the connection pool
+      // keeps track of the number of acquired channel counts, however the pool will first consult
+      // the health
+      // check, and then kick that connection away.
+      pool.release(channel);
+      return;
+    }
+    if (channel.attr(ChannelAttributes.EXPIRED).get() != Boolean.TRUE) {
+      pool.release(channel);
+      return;
+    }
+    channel.close().addListener(ignored -> pool.release(channel));
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java
new file mode 100644
index 0000000..8038567
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.util.function.BiConsumer;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+
+interface NettyClientService {
+
+  void acquireChannel(BiConsumer<Channel, Throwable> consumer);
+
+  void releaseChannel(Channel channel);
+
+  String queryPath();
+
+  ReadOnlyHttpHeaders headers();
+
+  long totalRequestBudgetInNanos();
+
+  Closeable newTimeout(Runnable client, long delayInNanos);
+
+  void runOnEventLoop(Runnable task);
+
+  boolean isShutdown();
+
+  long systemNanoTime();
+
+  <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> listener);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java
new file mode 100644
index 0000000..1cd8eed
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.util.AsciiString;
+
+final class NettyHeaders {
+  private static final AsciiString USER_AGENT = AsciiString.cached("statefun");
+
+  static final ReadOnlyHttpHeaders EMPTY = new ReadOnlyHttpHeaders(false);
+
+  static ReadOnlyHttpHeaders defaultHeadersFor(InetSocketAddress service) {
+    final AsciiString serviceHost;
+    if (service.getPort() == 443 || service.getPort() == 80) {
+      // we omit well known ports from the hostname header, as it is not common
+      // to include them.
+      serviceHost = AsciiString.cached(service.getHostString());
+    } else {
+      serviceHost = AsciiString.cached(service.getHostString() + ":" + service.getPort());
+    }
+    List<AsciiString> headers = new ArrayList<>();
+
+    headers.add(HttpHeaderNames.CONTENT_TYPE);
+    headers.add(HttpHeaderValues.APPLICATION_OCTET_STREAM);
+
+    headers.add(HttpHeaderNames.ACCEPT);
+    headers.add(HttpHeaderValues.APPLICATION_OCTET_STREAM);
+
+    headers.add(HttpHeaderNames.ACCEPT_ENCODING);
+    headers.add(HttpHeaderValues.GZIP_DEFLATE);
+
+    headers.add(HttpHeaderNames.CONNECTION);
+    headers.add(HttpHeaderValues.KEEP_ALIVE);
+
+    headers.add(HttpHeaderNames.USER_AGENT);
+    headers.add(USER_AGENT);
+
+    headers.add(HttpHeaderNames.HOST);
+    headers.add(serviceHost);
+
+    headers.add(HttpHeaderNames.CONTENT_LENGTH);
+    headers.add(AsciiString.cached("0"));
+
+    AsciiString[] kvPairs = headers.toArray(new AsciiString[0]);
+    return new ReadOnlyHttpHeaders(false, kvPairs);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java
new file mode 100644
index 0000000..0d314c1
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.function.IntFunction;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
+import org.apache.flink.util.Preconditions;
+
+final class NettyProtobuf {
+
+  public static <M extends Message> ByteBuf serializeProtobuf(
+      IntFunction<ByteBuf> allocator, M message) {
+    final int requiredSize = message.getSerializedSize();
+    final ByteBuf buf = allocator.apply(requiredSize);
+    try {
+      if (buf.nioBufferCount() == 1) {
+        zeroCopySerialize(message, requiredSize, buf);
+      } else {
+        serializeOutputStream(message, buf);
+      }
+      return buf;
+    } catch (IOException e) {
+      buf.release();
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public static <M extends Message> M deserializeProtobuf(ByteBuf buf, Parser<M> parser) {
+    try {
+      if (buf.nioBufferCount() == 1) {
+        return zeroCopyDeserialize(buf, parser);
+      } else {
+        return deserializeInputStream(buf, parser);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private static <M extends Message> void zeroCopySerialize(M message, int len, ByteBuf buf)
+      throws IOException {
+    Preconditions.checkState(len <= buf.writableBytes());
+    final int originalWriterIndex = buf.writerIndex();
+    ByteBuffer nioBuf = buf.nioBuffer(originalWriterIndex, len);
+    CodedOutputStream out = CodedOutputStream.newInstance(nioBuf);
+    message.writeTo(out);
+    out.flush();
+    buf.writerIndex(originalWriterIndex + len);
+  }
+
+  private static <M extends Message> void serializeOutputStream(M message, ByteBuf buf)
+      throws IOException {
+    message.writeTo(new ByteBufOutputStream(buf));
+  }
+
+  private static <M extends Message> M zeroCopyDeserialize(ByteBuf buf, Parser<M> parser)
+      throws InvalidProtocolBufferException {
+    final int messageLength = buf.readableBytes();
+    final int originalReaderIndex = buf.readerIndex();
+    ByteBuffer nioBuffer = buf.nioBuffer(originalReaderIndex, messageLength);
+    CodedInputStream in = CodedInputStream.newInstance(nioBuffer);
+    M message = parser.parseFrom(in);
+    buf.readerIndex(originalReaderIndex + messageLength);
+    return message;
+  }
+
+  private static <M extends Message> M deserializeInputStream(ByteBuf buf, Parser<M> parser)
+      throws InvalidProtocolBufferException {
+    return parser.parseFrom(new ByteBufInputStream(buf));
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java
new file mode 100644
index 0000000..9e74c76
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.ShutdownException;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class NettyRequest {
+  private static final Logger LOG = LoggerFactory.getLogger(NettyRequest.class);
+
+  private static final AtomicReferenceFieldUpdater<NettyRequest, Channel> ATTEMPT_CHANNEL_CAS =
+      AtomicReferenceFieldUpdater.newUpdater(NettyRequest.class, Channel.class, "attemptChannel");
+
+  // immutable setup
+  private final NettyClientService client;
+
+  // request specific immutable input
+  private final RemoteInvocationMetrics metrics;
+  private final ToFunctionRequestSummary reqSummary;
+  private final ToFunction toFunction;
+  private final long requestCreatedNanos;
+
+  // holder of the result
+  private final CompletableFuture<FromFunction> result = new CompletableFuture<>();
+
+  // request runtime
+  private long attemptStartedNanos;
+  private int numberOfAttempts;
+  @Nullable private Closeable retryTask;
+  @Nullable private volatile Channel attemptChannel;
+
+  @OnFlinkThread
+  NettyRequest(
+      NettyClientService client,
+      RemoteInvocationMetrics metrics,
+      ToFunctionRequestSummary requestSummary,
+      ToFunction toFunction) {
+    this.client = Objects.requireNonNull(client);
+    this.reqSummary = Objects.requireNonNull(requestSummary);
+    this.metrics = Objects.requireNonNull(metrics);
+    this.toFunction = Objects.requireNonNull(toFunction);
+    this.requestCreatedNanos = client.systemNanoTime();
+  }
+
+  // --------------------------------------------------------------------------------------------
+  // Actions
+  // --------------------------------------------------------------------------------------------
+
+  @OnFlinkThread
+  CompletableFuture<FromFunction> start() {
+    client.runOnEventLoop(this::startAttempt);
+    return result;
+  }
+
+  @OnChannelThread
+  void complete(FromFunction fromFn) {
+    try {
+      onAttemptCompleted();
+    } catch (Throwable t) {
+      LOG.warn("Attempt cleanup failed", t);
+    }
+    onFinalCompleted(fromFn, null);
+  }
+
+  @OnClientThread
+  @OnChannelThread
+  void completeAttemptExceptionally(Throwable cause) {
+    try {
+      onAttemptCompleted();
+    } catch (Throwable t) {
+      LOG.warn("Attempt cleanup failed", t);
+    }
+    try {
+      onAttemptCompletedExceptionally(cause);
+    } catch (Throwable t) {
+      onFinalCompleted(null, t);
+    }
+  }
+
+  @OnClientThread
+  private void startAttempt() {
+    try {
+      attemptStartedNanos = client.systemNanoTime();
+      client.acquireChannel(this::onChannelAcquisitionComplete);
+    } catch (Throwable throwable) {
+      completeAttemptExceptionally(throwable);
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  // Events
+  // --------------------------------------------------------------------------------------------
+
+  @OnChannelThread
+  private void onChannelAcquisitionComplete(Channel ch, Throwable cause) {
+    if (cause != null) {
+      completeAttemptExceptionally(cause);
+      return;
+    }
+    if (!ATTEMPT_CHANNEL_CAS.compareAndSet(this, null, ch)) {
+      // strange. I'm trying to acquire a channel, while still holding a channel.
+      // this should never happen, and it is a bug.
+      // lets abort.
+      LOG.warn(
+          "BUG: Trying to acquire a new Netty channel, while still holding an existing one. "
+              + "Failing this request, but continuing processing others.");
+      onFinalCompleted(
+          null,
+          new IllegalStateException(
+              "Unexpected request state, failing this request, but will try others."));
+      return;
+    }
+    // introduce the request to the pipeline.
+    // see ya' at the handler :)
+    client.writeAndFlush(this, ch, this::onFirstWriteCompleted);
+  }
+
+  @OnChannelThread
+  private void onFirstWriteCompleted(Void ignored, Throwable cause) {
+    if (cause != null) {
+      completeAttemptExceptionally(cause);
+    }
+  }
+
+  @OnClientThread
+  @OnChannelThread
+  private void onAttemptCompleted() {
+    // 1. release a channel if we have one. The cas here is not strictly needed,
+    // and it is here to be on the safe side.
+    Channel ch = ATTEMPT_CHANNEL_CAS.getAndSet(this, null);
+    if (ch != null) {
+      client.releaseChannel(ch);
+    }
+    final long nanoElapsed = client.systemNanoTime() - attemptStartedNanos;
+    final long millisElapsed = TimeUnit.NANOSECONDS.toMillis(nanoElapsed);
+    attemptStartedNanos = 0;
+    metrics.remoteInvocationLatency(millisElapsed);
+    IOUtils.closeQuietly(retryTask);
+    retryTask = null;
+    numberOfAttempts++;
+  }
+
+  @OnClientThread
+  @OnChannelThread
+  private void onAttemptCompletedExceptionally(Throwable cause) throws Throwable {
+    metrics.remoteInvocationFailures();
+    LOG.warn(
+        "Exception caught while trying to deliver a message: (attempt #"
+            + (numberOfAttempts - 1)
+            + ")"
+            + reqSummary,
+        cause);
+    if (client.isShutdown()) {
+      throw ShutdownException.INSTANCE;
+    }
+    final long delayUntilNextAttempt = delayUntilNextAttempt();
+    if (delayUntilNextAttempt < 0) {
+      throw RequestTimeoutException.INSTANCE;
+    }
+    analyzeCausalChain(cause);
+    LOG.info(
+        "Retry #"
+            + numberOfAttempts
+            + " "
+            + reqSummary
+            + " ,About to sleep for "
+            + TimeUnit.NANOSECONDS.toMillis(delayUntilNextAttempt));
+
+    // better luck next time!
+    Preconditions.checkState(retryTask == null);
+    this.retryTask = client.newTimeout(this::onAttemptBackoffTimer, delayUntilNextAttempt);
+  }
+
+  @OnClientThread
+  private void onAttemptBackoffTimer() {
+    if (delayUntilNextAttempt() < 0) {
+      completeAttemptExceptionally(RequestTimeoutException.INSTANCE);
+    } else if (client.isShutdown()) {
+      completeAttemptExceptionally(ShutdownException.INSTANCE);
+    } else {
+      startAttempt();
+    }
+  }
+
+  @OnClientThread
+  @OnChannelThread
+  private void onFinalCompleted(FromFunction result, Throwable o) {
+    if (o != null) {
+      this.result.completeExceptionally(o);
+    } else {
+      this.result.complete(result);
+    }
+  }
+
+  // ---------------------------------------------------------------------------------
+  // Request specific getters and setters
+  // ---------------------------------------------------------------------------------
+
+  CompletableFuture<FromFunction> result() {
+    return result;
+  }
+
+  long remainingRequestBudgetNanos() {
+    final long usedRequestBudget = client.systemNanoTime() - requestCreatedNanos;
+    return client.totalRequestBudgetInNanos() - usedRequestBudget;
+  }
+
+  ToFunction toFunction() {
+    return toFunction;
+  }
+
+  String uri() {
+    return client.queryPath();
+  }
+
+  private void analyzeCausalChain(Throwable cause) throws Throwable {
+    while (cause != null) {
+      if (!isRetryable(cause)) {
+        throw cause;
+      }
+      cause = cause.getCause();
+    }
+  }
+
+  private boolean isRetryable(Throwable exception) {
+    return !(exception instanceof ShutdownException)
+        && !(exception instanceof RequestTimeoutException);
+  }
+
+  private long delayUntilNextAttempt() {
+    final long remainingRequestBudget = remainingRequestBudgetNanos();
+    if (remainingRequestBudget
+        <= 1_000 * 1_000) { // if we are left with less than a millisecond, don't retry
+      return -1;
+    }
+    // start with 2 milliseconds.
+    final long delay = (2 * 1_000 * 1_000) * (1L << numberOfAttempts);
+    return Math.min(delay, remainingRequestBudget);
+  }
+
+  public ReadOnlyHttpHeaders headers() {
+    return client.headers();
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java
new file mode 100644
index 0000000..fc7252c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+
+public final class NettyRequestReplyClientFactory implements RequestReplyClientFactory {
+
+  @Nullable private transient NettySharedResources sharedNettyResources;
+
+  @Override
+  public RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl) {
+    NettySharedResources resources = this.sharedNettyResources;
+    if (resources == null) {
+      this.sharedNettyResources = (resources = new NettySharedResources());
+    }
+    NettyRequestReplySpec clientSpec = parseTransportSpec(transportProperties);
+    return NettyClient.from(resources, clientSpec, endpointUrl);
+  }
+
+  @Override
+  public void cleanup() {
+    NettySharedResources resources = this.sharedNettyResources;
+    this.sharedNettyResources = null;
+    if (resources != null) {
+      resources.shutdownGracefully();
+    }
+  }
+
+  private static NettyRequestReplySpec parseTransportSpec(ObjectNode transportProperties) {
+    try {
+      return OBJ_MAPPER.treeToValue(transportProperties, NettyRequestReplySpec.class);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException("Unable to parse Netty transport spec.", e);
+    }
+  }
+
+  private static final ObjectMapper OBJ_MAPPER = StateFunObjectMapper.create();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
new file mode 100644
index 0000000..43a5dc9
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.WrongHttpResponse;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+
+public final class NettyRequestReplyHandler extends ChannelDuplexHandler {
+
+  private final NettyRequestTimeoutTask requestDurationTracker = new NettyRequestTimeoutTask(this);
+
+  // it is set on write.
+  @Nullable private NettyRequest inflightRequest;
+
+  // cache the request headers. profiling shows that creating request headers takes around 6% of
+  // allocations, so it is very beneficial to cache and reuse the headers.
+  @Nullable private DefaultHttpHeaders cachedHeaders;
+
+  // ---------------------------------------------------------------------------------------------------------
+  // Netty API
+  // ---------------------------------------------------------------------------------------------------------
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+      throws Exception {
+    if (!(msg instanceof NettyRequest)) {
+      super.write(ctx, msg, promise);
+      return;
+    }
+    final NettyRequest request = (NettyRequest) msg;
+    if (inflightRequest != null) {
+      // this is a BUG: sending new request while an old request is in progress.
+      // we fail both of these requests.
+      IllegalStateException cause =
+          new IllegalStateException("A Channel has not finished the previous request.");
+      request.completeAttemptExceptionally(cause);
+      exceptionCaught(ctx, cause);
+      return;
+    }
+    this.inflightRequest = request;
+    // a new NettyRequestReply was introduced into the pipeline.
+    // we remember that request and forward an HTTP request on its behalf upstream.
+    // from now on, every exception thrown during the processing of this pipeline, either during the
+    // following section or
+    // during read(), will be caught and delivered to the @inFlightRequest via #exceptionCaught().
+    ByteBuf content = null;
+    try {
+      content = serializeProtobuf(ctx.channel().alloc()::buffer, request.toFunction());
+      writeHttpRequest(ctx, content, request);
+      scheduleRequestTimeout(ctx, request.remainingRequestBudgetNanos());
+    } catch (Throwable t) {
+      ReferenceCountUtil.safeRelease(content);
+      exceptionCaught(ctx, t);
+    }
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object message) {
+    final FullHttpResponse response =
+        (message instanceof FullHttpResponse) ? (FullHttpResponse) message : null;
+    try {
+      readHttpMessage(response);
+    } catch (Throwable t) {
+      exceptionCaught(ctx, t);
+    } finally {
+      ReferenceCountUtil.release(response);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    requestDurationTracker.cancel();
+    if (!ctx.channel().isActive()) {
+      tryComplete(null, cause);
+    } else {
+      ctx.channel().close().addListener(ignored -> tryComplete(null, cause));
+    }
+  }
+
+  // ---------------------------------------------------------------------------------------------------------
+  // HTTP Request Response
+  // ---------------------------------------------------------------------------------------------------------
+
+  private void writeHttpRequest(ChannelHandlerContext ctx, ByteBuf bodyBuf, NettyRequest req) {
+    DefaultFullHttpRequest http =
+        new DefaultFullHttpRequest(
+            HttpVersion.HTTP_1_1,
+            HttpMethod.POST,
+            req.uri(),
+            bodyBuf,
+            headers(req, bodyBuf),
+            NettyHeaders.EMPTY);
+
+    ctx.writeAndFlush(http);
+  }
+
+  private DefaultHttpHeaders headers(NettyRequest req, ByteBuf bodyBuf) {
+    final DefaultHttpHeaders headers;
+    if (cachedHeaders != null) {
+      headers = cachedHeaders;
+    } else {
+      headers = new DefaultHttpHeaders(false);
+      headers.add(req.headers());
+      this.cachedHeaders = headers;
+    }
+    headers.remove(HttpHeaderNames.CONTENT_LENGTH);
+    headers.add(HttpHeaderNames.CONTENT_LENGTH, bodyBuf.readableBytes());
+    return headers;
+  }
+
+  private void readHttpMessage(FullHttpResponse response) {
+    NettyRequest current = inflightRequest;
+    checkState(current != null, "A read without a request");
+
+    requestDurationTracker.cancel();
+
+    checkState(response != null, "Unexpected message type");
+    validateFullHttpResponse(response);
+    FromFunction fromFn =
+        NettyProtobuf.deserializeProtobuf(response.content(), FromFunction.parser());
+
+    tryComplete(fromFn, null);
+  }
+
+  public void onReleaseToPool() {
+    requestDurationTracker.cancel();
+    inflightRequest = null;
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    requestDurationTracker.cancel();
+    tryComplete(null, DisconnectedException.INSTANCE);
+    super.channelInactive(ctx);
+  }
+
+  @Override
+  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+    requestDurationTracker.cancel();
+    super.channelUnregistered(ctx);
+  }
+
+  private void validateFullHttpResponse(FullHttpResponse response) {
+    //
+    // check the return code
+    //
+    final int code = response.status().code();
+    if (code < 200 || code >= 300) {
+      String message =
+          "Unexpected response code " + code + " (" + response.status().reasonPhrase() + ") ";
+      throw new WrongHttpResponse(message);
+    }
+    //
+    // check for the correct content type
+    //
+    final boolean correctContentType =
+        response
+            .headers()
+            .containsValue(
+                HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM, true);
+
+    if (!correctContentType) {
+      String gotContentType = response.headers().get(HttpHeaderNames.CONTENT_TYPE);
+      throw new IllegalStateException("Unexpected content type " + gotContentType);
+    }
+    //
+    // a present HTTP body is expected.
+    //
+    checkState(response.content() != null, "Unexpected empty HTTP response (no body)");
+  }
+
+  private void scheduleRequestTimeout(
+      ChannelHandlerContext ctx, final long remainingRequestBudgetNanos) {
+    // compute the minimum request duration with an additional random jitter. The jitter is
+    // uniformly distributed in the range
+    // of [7ms, 13ms).
+    long minRequestDurationJitteredNanos =
+        ThreadLocalRandom.current().nextLong(7_000_000, 13_000_000);
+    long remainingRequestBudget =
+        Math.max(minRequestDurationJitteredNanos, remainingRequestBudgetNanos);
+    requestDurationTracker.schedule(ctx, remainingRequestBudget);
+  }
+
+  private void tryComplete(FromFunction response, Throwable cause) {
+    final NettyRequest current = inflightRequest;
+    if (current == null) {
+      return;
+    }
+    inflightRequest = null;
+    if (cause != null) {
+      current.completeAttemptExceptionally(cause);
+    } else {
+      current.complete(response);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
new file mode 100644
index 0000000..8df823d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.time.Duration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public final class NettyRequestReplySpec {
+
+  @JsonProperty("call")
+  public Duration callTimeout = Duration.ofMinutes(2);
+
+  @JsonProperty("connect")
+  public Duration connectTimeout = Duration.ofSeconds(20);
+
+  @JsonProperty("pool_ttl")
+  public Duration pooledConnectionTTL = Duration.ofSeconds(15);
+
+  @JsonProperty("pool_size")
+  public int connectionPoolMaxSize = 1024;
+
+  @JsonProperty("payload_max_bytes")
+  public int maxRequestOrResponseSizeInBytes = 32 * 1048576;
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java
new file mode 100644
index 0000000..8a29a3a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException;
+
+final class NettyRequestTimeoutTask implements Runnable {
+  private final NettyRequestReplyHandler handler;
+  @Nullable private ScheduledFuture<?> future;
+  @Nullable private ChannelHandlerContext ctx;
+
+  public NettyRequestTimeoutTask(NettyRequestReplyHandler handler) {
+    this.handler = Objects.requireNonNull(handler);
+  }
+
+  void schedule(ChannelHandlerContext ctx, long remainingRequestBudget) {
+    this.ctx = Objects.requireNonNull(ctx);
+    this.future = ctx.executor().schedule(this, remainingRequestBudget, TimeUnit.NANOSECONDS);
+  }
+
+  void cancel() {
+    if (future != null) {
+      future.cancel(false);
+      future = null;
+    }
+    ctx = null;
+  }
+
+  @Override
+  public void run() {
+    checkState(ctx != null);
+    checkState(future != null);
+    handler.exceptionCaught(ctx, RequestTimeoutException.INSTANCE);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
new file mode 100644
index 0000000..22390c6
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLException;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueue;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueueEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueueSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.util.IOUtils;
+
+final class NettySharedResources {
+  private final AtomicBoolean shutdown = new AtomicBoolean();
+  private final Bootstrap bootstrap;
+  @Nullable private SslContext sslContext;
+
+  private final CloseableRegistry mangedResources = new CloseableRegistry();
+
+  public NettySharedResources() {
+    // TODO: configure DNS resolving
+    final EventLoopGroup workerGroup;
+    final Class<? extends Channel> channelClass;
+    if (Epoll.isAvailable()) {
+      workerGroup = new EpollEventLoopGroup(demonThreadFactory("netty-http-worker"));
+      channelClass = EpollSocketChannel.class;
+    } else if (KQueue.isAvailable()) {
+      workerGroup = new KQueueEventLoopGroup(demonThreadFactory("http-netty-worker"));
+      channelClass = KQueueSocketChannel.class;
+    } else {
+      workerGroup = new NioEventLoopGroup(demonThreadFactory("netty-http-client"));
+      channelClass = NioSocketChannel.class;
+    }
+    registerClosable(workerGroup::shutdownGracefully);
+
+    Bootstrap bootstrap = new Bootstrap();
+    bootstrap.group(workerGroup);
+    bootstrap.channel(channelClass);
+
+    this.bootstrap = bootstrap;
+  }
+
+  public Bootstrap bootstrap() {
+    return bootstrap;
+  }
+
+  public SslContext sslContext() {
+    SslContext sslCtx = sslContext;
+    if (sslCtx != null) {
+      return sslCtx;
+    }
+    try {
+      sslCtx = SslContextBuilder.forClient().build();
+      this.sslContext = sslCtx;
+      return sslCtx;
+    } catch (SSLException e) {
+      throw new IllegalStateException("Failed to initialize an SSL provider", e);
+    }
+  }
+
+  public void registerClosable(Closeable closeable) {
+    try {
+      mangedResources.registerCloseable(closeable);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public boolean isShutdown() {
+    return shutdown.get();
+  }
+
+  public void shutdownGracefully() {
+    if (shutdown.compareAndSet(false, true)) {
+      IOUtils.closeQuietly(mangedResources);
+    }
+  }
+
+  private static ThreadFactory demonThreadFactory(String name) {
+    return runnable -> {
+      Thread t = new Thread(runnable);
+      t.setDaemon(true);
+      t.setName(name);
+      return t;
+    };
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java
new file mode 100644
index 0000000..9e58046
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+import org.apache.flink.statefun.sdk.TypeName;
+
+@AutoService(ExtensionModule.class)
+public class NettyTransportModule implements ExtensionModule {
+
+  public static final TypeName NETTY_TRANSPORT =
+      TypeName.parseFrom("io.statefun.transports.v1/async");
+
+  @Override
+  public void configure(Map<String, String> globalConfigurations, Binder binder) {
+    binder.bindExtension(NETTY_TRANSPORT, new NettyRequestReplyClientFactory());
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java
new file mode 100644
index 0000000..9939433
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A Thread that is assigned to a specific Netty Channel. In Netty's programming model, each channel
+ * is assigned to a specific thread for the lifetime of the channel, and all Channel releated events
+ * are dispatched on this thread.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnChannelThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java
new file mode 100644
index 0000000..bbd7fec
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A specific Thread that is bound to a {@link NettyClient} (see {@link NettyClient#eventLoop}).
+ * This thread is assigned to a specific NettyClient and never changes.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnClientThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java
new file mode 100644
index 0000000..35192a8
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** A Thread that executes Flink's operator. */
+@Documented
+@Target({ElementType.METHOD, ElementType.CONSTRUCTOR})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnFlinkThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java
new file mode 100644
index 0000000..f349097
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+import java.io.IOException;
+
+public final class DisconnectedException extends IOException {
+  public static final DisconnectedException INSTANCE = new DisconnectedException();
+
+  private DisconnectedException() {
+    super("Disconnected");
+    setStackTrace(new StackTraceElement[0]);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java
new file mode 100644
index 0000000..a73d52d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public class NoMoreRoutesException extends RuntimeException {
+  public NoMoreRoutesException(String message) {
+    super(message);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java
new file mode 100644
index 0000000..becf70d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+import java.util.concurrent.TimeoutException;
+
+public final class RequestTimeoutException extends TimeoutException {
+  public static final RequestTimeoutException INSTANCE = new RequestTimeoutException();
+
+  public RequestTimeoutException() {
+    setStackTrace(new StackTraceElement[] {});
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java
new file mode 100644
index 0000000..0bb5b78
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public final class ShutdownException extends RuntimeException {
+
+  public static final ShutdownException INSTANCE = new ShutdownException();
+
+  public ShutdownException() {
+    super("Shutdown");
+    setStackTrace(new StackTraceElement[] {});
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java
new file mode 100644
index 0000000..d352d2a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public final class WrongHttpResponse extends RuntimeException {
+
+  public WrongHttpResponse(String message) {
+    super(message);
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java
new file mode 100644
index 0000000..2cf1134
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.junit.Test;
+
+public class EndpointTest {
+
+  @Test
+  public void exampleUsage() {
+    Endpoint endpoint = new Endpoint(URI.create("https://api.gateway.com:1234/statefun?xyz=5678"));
+
+    assertThat(endpoint.useTls(), is(true));
+    assertThat(endpoint.serviceAddress().getHostString(), is("api.gateway.com"));
+    assertThat(endpoint.serviceAddress().getPort(), is(1234));
+    assertThat(endpoint.queryPath(), is("/statefun?xyz=5678"));
+  }
+
+  @Test
+  public void anotherExample() {
+    Endpoint endpoint = new Endpoint(URI.create("https://greeter-svc/statefun"));
+
+    assertThat(endpoint.useTls(), is(true));
+    assertThat(endpoint.queryPath(), is("/statefun"));
+
+    InetSocketAddress serviceAddress = endpoint.serviceAddress();
+    assertThat(serviceAddress.getHostString(), is("greeter-svc"));
+    assertThat(serviceAddress.getPort(), is(443));
+  }
+
+  @Test
+  public void emptyQueryPathIsASingleSlash() {
+    Endpoint endpoint = new Endpoint(URI.create("http://greeter-svc"));
+
+    assertThat(endpoint.queryPath(), is("/"));
+  }
+
+  @Test
+  public void dontUseTls() {
+    Endpoint endpoint = new Endpoint(URI.create("http://api.gateway.com:1234/statefun?xyz=5678"));
+
+    assertThat(endpoint.useTls(), is(false));
+  }
+
+  @Test
+  public void useTls() {
+    Endpoint endpoint = new Endpoint(URI.create("https://foobar.net"));
+
+    assertThat(endpoint.useTls(), is(true));
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java
new file mode 100644
index 0000000..fc0e3ec
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.IntFunction;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.junit.After;
+import org.junit.Test;
+
+public class NettyProtobufTest {
+
+  @After
+  public void tearDown() {
+    ALLOCATOR.close();
+  }
+
+  private final AutoReleasingAllocator ALLOCATOR = new AutoReleasingAllocator();
+
+  @Test
+  public void roundTrip() {
+    char[] chars = new char[1024 * 1024];
+    Arrays.fill(chars, 'x');
+    String pad = new String(chars);
+
+    for (int i = 0; i < 100; i++) {
+      int size = ThreadLocalRandom.current().nextInt(1, pad.length());
+      Address original =
+          Address.newBuilder()
+              .setNamespace("namespace")
+              .setType("type")
+              .setId(pad.substring(0, size))
+              .build();
+
+      Address actual = serdeRoundTrip(ALLOCATOR, original);
+
+      assertThat(actual, is(original));
+    }
+  }
+
+  @Test
+  public void heapBufferRoundTrip() {
+    char[] chars = new char[1024 * 1024];
+    Arrays.fill(chars, 'x');
+    String pad = new String(chars);
+
+    IntFunction<ByteBuf> heapAllocator = ByteBufAllocator.DEFAULT::heapBuffer;
+
+    for (int i = 0; i < 100; i++) {
+      int size = ThreadLocalRandom.current().nextInt(1, pad.length());
+      Address original =
+          Address.newBuilder()
+              .setNamespace("namespace")
+              .setType("type")
+              .setId(pad.substring(0, size))
+              .build();
+
+      Address actual = serdeRoundTrip(heapAllocator, original);
+      assertThat(actual, is(original));
+    }
+  }
+
+  private Address serdeRoundTrip(IntFunction<ByteBuf> allocator, Address original) {
+    ByteBuf buf = NettyProtobuf.serializeProtobuf(allocator, original);
+    Address got = NettyProtobuf.deserializeProtobuf(buf, Address.parser());
+    buf.release();
+    return got;
+  }
+
+  private static final class AutoReleasingAllocator implements IntFunction<ByteBuf>, AutoCloseable {
+    private final ArrayDeque<ByteBuf> allocatedDuringATest = new ArrayDeque<>();
+
+    @Override
+    public ByteBuf apply(int value) {
+      ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(value);
+      allocatedDuringATest.addLast(buf);
+      return buf;
+    }
+
+    @Override
+    public void close() {
+      for (ByteBuf buf : allocatedDuringATest) {
+        int refCount = buf.refCnt();
+        if (refCount > 0) {
+          buf.release(refCount);
+        }
+      }
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java
new file mode 100644
index 0000000..29c8f4c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.IdentityHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelConfig;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelMetadata;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.ShutdownException;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyRequestTest {
+
+  private final FakeMetrics FAKE_METRICS = new FakeMetrics();
+
+  @Test
+  public void successfulSanity() {
+    FakeClient fakeClient = new FakeClient();
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+    request.complete(FromFunction.getDefaultInstance());
+
+    assertThat(request.result().join(), is(FromFunction.getDefaultInstance()));
+  }
+
+  @Test
+  public void unSuccessfulSanity() {
+    FakeClient fakeClient = new FakeClient();
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+    request.completeAttemptExceptionally(ShutdownException.INSTANCE);
+
+    assertThat(request.result().isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void canNotAcquireChannel() {
+    // a client that never returns a channel.
+    class alwaysFailingToAcquireChannel extends FakeClient {
+      @Override
+      public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+        consumer.accept(null, new IllegalStateException("no channel for you"));
+      }
+    }
+
+    NettyRequest request =
+        new NettyRequest(
+            new alwaysFailingToAcquireChannel(),
+            FAKE_METRICS,
+            FAKE_SUMMARY,
+            ToFunction.getDefaultInstance());
+
+    CompletableFuture<FromFunction> result = request.start();
+
+    assertThat(result.isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void acquiredChannelShouldBeReleased() {
+    FakeClient fakeClient = new FakeClient();
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+
+    assertEquals(1, fakeClient.LIVE_CHANNELS.size());
+    request.completeAttemptExceptionally(ShutdownException.INSTANCE);
+    assertEquals(0, fakeClient.LIVE_CHANNELS.size());
+  }
+
+  @Test
+  public void failingWriteShouldFailTheRequest() {
+    // the following is a client that allows acquiring a channel
+    class client extends FakeClient {
+      @Override
+      public <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> andThen) {
+        andThen.accept(null, new IllegalStateException("can't write."));
+      }
+    }
+
+    client fakeClient = new client();
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+
+    Assert.assertTrue(request.result().isCompletedExceptionally());
+  }
+
+  @Test
+  public void testRemainBudget() {
+    FakeClient fakeClient = new FakeClient();
+    fakeClient.REQUEST_BUDGET = Duration.ofMillis(20).toNanos();
+
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+    // move the clock 5ms forward
+    fakeClient.NOW += Duration.ofMillis(5).toNanos();
+    // fail the request
+    request.completeAttemptExceptionally(DisconnectedException.INSTANCE);
+
+    Assert.assertFalse(request.result().isDone());
+    assertEquals(Duration.ofMillis(15).toNanos(), request.remainingRequestBudgetNanos());
+  }
+
+  @Test
+  public void testRetries() {
+    FakeClient fakeClient = new FakeClient();
+    fakeClient.REQUEST_BUDGET = Duration.ofMillis(20).toNanos();
+
+    NettyRequest request =
+        new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+    request.start();
+
+    for (int i = 0; i < 20; i++) {
+      request.completeAttemptExceptionally(DisconnectedException.INSTANCE);
+      if (request.result().isCompletedExceptionally()) {
+        return;
+      }
+      fakeClient.NOW += 1_000_000; // + 1ms
+      fakeClient.TIMEOUTS.pop().run();
+    }
+
+    throw new AssertionError();
+  }
+
+  // ---------------------------------------------------------------------------------------------------------
+  // Test collaborators
+  // ---------------------------------------------------------------------------------------------------------
+
+  @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection", "FieldCanBeLocal", "FieldMayBeFinal"})
+  private static class FakeClient implements NettyClientService {
+    // test knobs
+
+    private long NOW = 0;
+    private long REQUEST_BUDGET = 0;
+    final ArrayDeque<Runnable> TIMEOUTS = new ArrayDeque<>();
+    final IdentityHashMap<FakeChannel, Boolean> LIVE_CHANNELS = new IdentityHashMap<>();
+
+    @Override
+    public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+      FakeChannel ch = new FakeChannel();
+      LIVE_CHANNELS.put(ch, Boolean.TRUE);
+      consumer.accept(ch, null);
+    }
+
+    @SuppressWarnings("SuspiciousMethodCalls")
+    @Override
+    public void releaseChannel(Channel channel) {
+      Boolean existed = LIVE_CHANNELS.remove(channel);
+      if (existed == null) {
+        throw new AssertionError("Trying to release a non allocated channel");
+      }
+    }
+
+    @Override
+    public String queryPath() {
+      return "/";
+    }
+
+    @Override
+    public ReadOnlyHttpHeaders headers() {
+      return new ReadOnlyHttpHeaders(false);
+    }
+
+    @Override
+    public long totalRequestBudgetInNanos() {
+      return REQUEST_BUDGET;
+    }
+
+    @Override
+    public Closeable newTimeout(Runnable client, long delayInNanos) {
+      TIMEOUTS.add(client);
+      return () -> {};
+    }
+
+    @Override
+    public void runOnEventLoop(Runnable task) {
+      task.run();
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return false;
+    }
+
+    @Override
+    public long systemNanoTime() {
+      return NOW;
+    }
+
+    @Override
+    public <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> andThen) {}
+  }
+
+  private static final class FakeMetrics implements RemoteInvocationMetrics {
+    int failures;
+
+    @Override
+    public void remoteInvocationFailures() {
+      failures++;
+    }
+
+    @Override
+    public void remoteInvocationLatency(long elapsed) {}
+  }
+
+  private static final ToFunctionRequestSummary FAKE_SUMMARY =
+      new ToFunctionRequestSummary(new Address(new FunctionType("a", "b"), "c"), 50, 3, 1);
+
+  public static class FakeChannel extends AbstractChannel {
+
+    public FakeChannel() {
+      super(null);
+    }
+
+    @Override
+    protected AbstractUnsafe newUnsafe() {
+      return null;
+    }
+
+    @Override
+    protected boolean isCompatible(EventLoop eventLoop) {
+      return false;
+    }
+
+    @Override
+    protected SocketAddress localAddress0() {
+      return null;
+    }
+
+    @Override
+    protected SocketAddress remoteAddress0() {
+      return null;
+    }
+
+    @Override
+    protected void doBind(SocketAddress socketAddress) {}
+
+    @Override
+    protected void doDisconnect() {}
+
+    @Override
+    protected void doClose() {}
+
+    @Override
+    protected void doBeginRead() {}
+
+    @Override
+    protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) {}
+
+    @Override
+    public ChannelConfig config() {
+      return null;
+    }
+
+    @Override
+    public boolean isOpen() {
+      return false;
+    }
+
+    @Override
+    public boolean isActive() {
+      return false;
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+      return null;
+    }
+  }
+}