[FLINK-23711] Add an asynchronous HTTP transport
This commits adds a new transport based on a shaded version of Netty.
This transport is currently opt-in, and can be enabled by adding to
the endpoint definition:
...
transport:
type: io.statefun.transports.v1/async
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index 50e1028..7d5db8e 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -32,6 +32,7 @@
<properties>
<okhttp.version>3.14.6</okhttp.version>
<additional-sources.dir>target/additional-sources</additional-sources.dir>
+ <flink-shaded-netty.version>4.1.49.Final-13.0</flink-shaded-netty.version>
</properties>
<dependencies>
@@ -91,6 +92,12 @@
<version>${unixsocket.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-netty</artifactId>
+ <version>${flink-shaded-netty.version}</version>
+ </dependency>
+
<!-- tests -->
<dependency>
<groupId>junit</groupId>
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java
new file mode 100644
index 0000000..8333a29
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/ChannelAttributes.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+final class ChannelAttributes {
+
+ private ChannelAttributes() {}
+
+ static final AttributeKey<Boolean> EXPIRED =
+ AttributeKey.valueOf("org.apache.flink.statefun.flink.core.nettyclient.ExpiredKey");
+ static final AttributeKey<Boolean> ACQUIRED =
+ AttributeKey.valueOf("org.apache.flink.statefun.flink.core.nettyclient.AcquiredKey");
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java
new file mode 100644
index 0000000..4fb5c07
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/Endpoint.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+final class Endpoint {
+ private final String queryPath;
+ private final InetSocketAddress serviceAddress;
+ private final boolean useTls;
+
+ Endpoint(URI endpointUrl) {
+ requireValidEndpointUri(endpointUrl);
+ this.useTls = endpointUrl.getScheme().equalsIgnoreCase("https");
+ this.queryPath = Endpoint.computeQueryPath(endpointUrl);
+ this.serviceAddress =
+ InetSocketAddress.createUnresolved(endpointUrl.getHost(), endpointPort(endpointUrl));
+ }
+
+ public String queryPath() {
+ return queryPath;
+ }
+
+ public InetSocketAddress serviceAddress() {
+ return serviceAddress;
+ }
+
+ public boolean useTls() {
+ return useTls;
+ }
+
+ private static int endpointPort(URI endpoint) {
+ int port = endpoint.getPort();
+ if (port > 0) {
+ return port;
+ }
+ if (endpoint.getScheme().equalsIgnoreCase("https")) {
+ return 443;
+ }
+ return 80;
+ }
+
+ private static String computeQueryPath(URI endpoint) {
+ String uri = endpoint.getPath();
+ if (uri == null || uri.isEmpty()) {
+ uri = "/";
+ }
+ String query = endpoint.getQuery();
+ if (query != null) {
+ uri += "?" + query;
+ }
+ String fragment = endpoint.getFragment();
+ if (fragment != null) {
+ uri += "#" + fragment;
+ }
+ return uri;
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static void requireValidEndpointUri(URI endpointUrl) {
+ try {
+ endpointUrl.parseServerAuthority();
+ } catch (URISyntaxException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java
new file mode 100644
index 0000000..1532789
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslCloseCompletionEvent;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+
+/**
+ * An Handler that we add to the channel pipeline that makes sure that the channel is: 1) does not
+ * stick a round for a long time (if {@code connectionTtlMs > 0}. 2) if this channel uses TLS, and a
+ * {@code SslCloseCompletionEvent} event recieved, this channel will be closed.
+ */
+final class HttpConnectionPoolHandler extends ChannelDuplexHandler {
+ private final long ttlMs;
+ @Nullable private ScheduledFuture<?> timer;
+
+ HttpConnectionPoolHandler(long connectionTtlMs) {
+ this.ttlMs = connectionTtlMs;
+ }
+
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ this.initialize(ctx);
+ super.channelRegistered(ctx);
+ }
+
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ initialize(ctx);
+ super.channelActive(ctx);
+ }
+
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ initialize(ctx);
+ super.handlerAdded(ctx);
+ }
+
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ destroy();
+ super.handlerRemoved(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ destroy();
+ super.channelInactive(ctx);
+ }
+
+ private void initialize(ChannelHandlerContext ctx) {
+ if (ttlMs <= 0) {
+ return;
+ }
+ if (timer != null) {
+ return;
+ }
+ long channelTimeToLive = ttlMs + positiveRandomJitterMillis();
+ timer =
+ ctx.channel()
+ .eventLoop()
+ .schedule(() -> tryExpire(ctx, false), channelTimeToLive, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (!(evt instanceof SslCloseCompletionEvent)) {
+ return;
+ }
+ tryExpire(ctx, true);
+ }
+
+ private void destroy() {
+ if (timer != null) {
+ timer.cancel(false);
+ timer = null;
+ }
+ }
+
+ private void tryExpire(ChannelHandlerContext ctx, boolean shouldCancelTimer) {
+ if (shouldCancelTimer && timer != null) {
+ timer.cancel(false);
+ }
+ timer = null;
+ Channel channel = ctx.channel();
+ channel.attr(ChannelAttributes.EXPIRED).set(TRUE);
+ if (channel.isActive() && channel.attr(ChannelAttributes.ACQUIRED).get() == FALSE) {
+ // this channel is sitting all idly in the connection pool, unsuspecting of whats to come.
+ // we close it, but leave it in the pool, as the pool doesn't offer
+ // an API to remove an arbitrary connection. Eventually an health check will detect this, and
+ // remove it.
+ channel.close();
+ }
+ }
+
+ /** Compute a random delay between 1 and 3 seconds. */
+ private static int positiveRandomJitterMillis() {
+ return ThreadLocalRandom.current().nextInt(1_000, 3_000);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
new file mode 100644
index 0000000..d3d02a8
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelPoolHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContentDecompressor;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+final class HttpConnectionPoolManager implements ChannelPoolHandler {
+ private final NettyRequestReplySpec spec;
+ private final SslContext sslContext;
+ private final String peerHost;
+ private final int peerPort;
+
+ public HttpConnectionPoolManager(
+ @Nullable SslContext sslContext, NettyRequestReplySpec spec, String peerHost, int peerPort) {
+ this.spec = Objects.requireNonNull(spec);
+ this.peerHost = Objects.requireNonNull(peerHost);
+ this.sslContext = sslContext;
+ this.peerPort = peerPort;
+ }
+
+ @Override
+ public void channelAcquired(Channel channel) {
+ channel.attr(ChannelAttributes.ACQUIRED).set(Boolean.TRUE);
+ }
+
+ @Override
+ public void channelReleased(Channel channel) {
+ channel.attr(ChannelAttributes.ACQUIRED).set(Boolean.FALSE);
+ NettyRequestReplyHandler handler = channel.pipeline().get(NettyRequestReplyHandler.class);
+ handler.onReleaseToPool();
+ }
+
+ @Override
+ public void channelCreated(Channel channel) {
+ ChannelPipeline p = channel.pipeline();
+ if (sslContext != null) {
+ SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peerHost, peerPort);
+ p.addLast(sslHandler);
+ }
+ p.addLast(new HttpClientCodec());
+ p.addLast(new HttpContentDecompressor(true));
+ p.addLast(new HttpObjectAggregator(spec.maxRequestOrResponseSizeInBytes, true));
+ p.addLast(new NettyRequestReplyHandler());
+
+ long channelTimeToLiveMillis = spec.pooledConnectionTTL.toMillis();
+ p.addLast(new HttpConnectionPoolHandler(channelTimeToLiveMillis));
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
new file mode 100644
index 0000000..3cfa652
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+
+import java.io.Closeable;
+import java.net.URI;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelHealthChecker;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelPoolHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.pool.FixedChannelPool;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+final class NettyClient implements RequestReplyClient, NettyClientService {
+ private final NettySharedResources shared;
+ private final FixedChannelPool pool;
+ private final Endpoint endpoint;
+ private final ReadOnlyHttpHeaders headers;
+ private final long totalRequestBudgetInNanos;
+ private final EventLoop eventLoop;
+
+ public static NettyClient from(
+ NettySharedResources shared, NettyRequestReplySpec spec, URI endpointUrl) {
+ Endpoint endpoint = new Endpoint(endpointUrl);
+ long totalRequestBudgetInNanos = spec.callTimeout.toNanos();
+ ReadOnlyHttpHeaders headers = NettyHeaders.defaultHeadersFor(endpoint.serviceAddress());
+ // prepare a customized bootstrap for this specific spec.
+ // this bootstrap reuses the select loop and io threads as other endpoints.
+ Bootstrap bootstrap = shared.bootstrap().clone();
+ bootstrap.option(CONNECT_TIMEOUT_MILLIS, (int) spec.connectTimeout.toMillis());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.remoteAddress(endpoint.serviceAddress());
+ // setup tls
+ @Nullable final SslContext sslContext;
+ if (endpoint.useTls()) {
+ sslContext = shared.sslContext();
+ } else {
+ sslContext = null;
+ }
+ // setup a channel pool handler
+ ChannelPoolHandler poolHandler =
+ new HttpConnectionPoolManager(
+ sslContext,
+ spec,
+ endpoint.serviceAddress().getHostString(),
+ endpoint.serviceAddress().getPort());
+ // setup a fixed capacity channel pool
+ FixedChannelPool pool =
+ new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE,
+ FixedChannelPool.AcquireTimeoutAction.FAIL,
+ spec.connectTimeout.toMillis(),
+ spec.connectionPoolMaxSize,
+ 2147483647,
+ true,
+ true);
+ shared.registerClosable(pool::closeAsync);
+ // use a dedicated, event loop to execute timers and tasks. An event loop is backed by a single
+ // thread.
+ EventLoop eventLoop = bootstrap.config().group().next();
+ return new NettyClient(shared, eventLoop, pool, endpoint, headers, totalRequestBudgetInNanos);
+ }
+
+ private NettyClient(
+ NettySharedResources shared,
+ EventLoop anEventLoop,
+ FixedChannelPool pool,
+ Endpoint endpoint,
+ ReadOnlyHttpHeaders defaultHttpHeaders,
+ long totalRequestBudgetInNanos) {
+ this.shared = Objects.requireNonNull(shared);
+ this.eventLoop = Objects.requireNonNull(anEventLoop);
+ this.pool = Objects.requireNonNull(pool);
+ this.endpoint = Objects.requireNonNull(endpoint);
+ this.headers = Objects.requireNonNull(defaultHttpHeaders);
+ this.totalRequestBudgetInNanos = totalRequestBudgetInNanos;
+ }
+
+ @Override
+ public CompletableFuture<FromFunction> call(
+ ToFunctionRequestSummary requestSummary,
+ RemoteInvocationMetrics metrics,
+ ToFunction toFunction) {
+ NettyRequest request = new NettyRequest(this, metrics, requestSummary, toFunction);
+ return request.start();
+ }
+
+ // -------------------------------------------------------------------------------------
+ // The following methods are used by NettyRequest during the various attempts
+ // -------------------------------------------------------------------------------------
+
+ @Override
+ public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+ pool.acquire()
+ .addListener(
+ future -> {
+ Throwable cause = future.cause();
+ if (cause != null) {
+ consumer.accept(null, cause);
+ } else {
+ Channel ch = (Channel) future.getNow();
+ consumer.accept(ch, null);
+ }
+ });
+ }
+
+ @Override
+ public void releaseChannel(Channel channel) {
+ EventLoop chEventLoop = channel.eventLoop();
+ if (chEventLoop.inEventLoop()) {
+ releaseChannel0(channel);
+ } else {
+ chEventLoop.execute(() -> releaseChannel0(channel));
+ }
+ }
+
+ @Override
+ public String queryPath() {
+ return endpoint.queryPath();
+ }
+
+ @Override
+ public ReadOnlyHttpHeaders headers() {
+ return headers;
+ }
+
+ @Override
+ public long totalRequestBudgetInNanos() {
+ return totalRequestBudgetInNanos;
+ }
+
+ @Override
+ public Closeable newTimeout(Runnable client, long delayInNanos) {
+ ScheduledFuture<?> future = eventLoop.schedule(client, delayInNanos, TimeUnit.NANOSECONDS);
+ return () -> future.cancel(false);
+ }
+
+ @Override
+ public void runOnEventLoop(Runnable task) {
+ Objects.requireNonNull(task);
+ if (eventLoop.inEventLoop()) {
+ task.run();
+ } else {
+ eventLoop.execute(task);
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shared.isShutdown();
+ }
+
+ @Override
+ public long systemNanoTime() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public <T> void writeAndFlush(T what, Channel where, BiConsumer<Void, Throwable> andThen) {
+ where
+ .writeAndFlush(what)
+ .addListener(
+ future -> {
+ Throwable cause = future.cause();
+ andThen.accept(null, cause);
+ });
+ }
+
+ private void releaseChannel0(Channel channel) {
+ if (!channel.isActive()) {
+ // We still need to return this channel to the pool, because the connection pool
+ // keeps track of the number of acquired channel counts, however the pool will first consult
+ // the health
+ // check, and then kick that connection away.
+ pool.release(channel);
+ return;
+ }
+ if (channel.attr(ChannelAttributes.EXPIRED).get() != Boolean.TRUE) {
+ pool.release(channel);
+ return;
+ }
+ channel.close().addListener(ignored -> pool.release(channel));
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java
new file mode 100644
index 0000000..8038567
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.util.function.BiConsumer;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+
+interface NettyClientService {
+
+ void acquireChannel(BiConsumer<Channel, Throwable> consumer);
+
+ void releaseChannel(Channel channel);
+
+ String queryPath();
+
+ ReadOnlyHttpHeaders headers();
+
+ long totalRequestBudgetInNanos();
+
+ Closeable newTimeout(Runnable client, long delayInNanos);
+
+ void runOnEventLoop(Runnable task);
+
+ boolean isShutdown();
+
+ long systemNanoTime();
+
+ <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> listener);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java
new file mode 100644
index 0000000..1cd8eed
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyHeaders.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.util.AsciiString;
+
+final class NettyHeaders {
+ private static final AsciiString USER_AGENT = AsciiString.cached("statefun");
+
+ static final ReadOnlyHttpHeaders EMPTY = new ReadOnlyHttpHeaders(false);
+
+ static ReadOnlyHttpHeaders defaultHeadersFor(InetSocketAddress service) {
+ final AsciiString serviceHost;
+ if (service.getPort() == 443 || service.getPort() == 80) {
+ // we omit well known ports from the hostname header, as it is not common
+ // to include them.
+ serviceHost = AsciiString.cached(service.getHostString());
+ } else {
+ serviceHost = AsciiString.cached(service.getHostString() + ":" + service.getPort());
+ }
+ List<AsciiString> headers = new ArrayList<>();
+
+ headers.add(HttpHeaderNames.CONTENT_TYPE);
+ headers.add(HttpHeaderValues.APPLICATION_OCTET_STREAM);
+
+ headers.add(HttpHeaderNames.ACCEPT);
+ headers.add(HttpHeaderValues.APPLICATION_OCTET_STREAM);
+
+ headers.add(HttpHeaderNames.ACCEPT_ENCODING);
+ headers.add(HttpHeaderValues.GZIP_DEFLATE);
+
+ headers.add(HttpHeaderNames.CONNECTION);
+ headers.add(HttpHeaderValues.KEEP_ALIVE);
+
+ headers.add(HttpHeaderNames.USER_AGENT);
+ headers.add(USER_AGENT);
+
+ headers.add(HttpHeaderNames.HOST);
+ headers.add(serviceHost);
+
+ headers.add(HttpHeaderNames.CONTENT_LENGTH);
+ headers.add(AsciiString.cached("0"));
+
+ AsciiString[] kvPairs = headers.toArray(new AsciiString[0]);
+ return new ReadOnlyHttpHeaders(false, kvPairs);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java
new file mode 100644
index 0000000..0d314c1
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobuf.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.function.IntFunction;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
+import org.apache.flink.util.Preconditions;
+
+final class NettyProtobuf {
+
+ public static <M extends Message> ByteBuf serializeProtobuf(
+ IntFunction<ByteBuf> allocator, M message) {
+ final int requiredSize = message.getSerializedSize();
+ final ByteBuf buf = allocator.apply(requiredSize);
+ try {
+ if (buf.nioBufferCount() == 1) {
+ zeroCopySerialize(message, requiredSize, buf);
+ } else {
+ serializeOutputStream(message, buf);
+ }
+ return buf;
+ } catch (IOException e) {
+ buf.release();
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static <M extends Message> M deserializeProtobuf(ByteBuf buf, Parser<M> parser) {
+ try {
+ if (buf.nioBufferCount() == 1) {
+ return zeroCopyDeserialize(buf, parser);
+ } else {
+ return deserializeInputStream(buf, parser);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static <M extends Message> void zeroCopySerialize(M message, int len, ByteBuf buf)
+ throws IOException {
+ Preconditions.checkState(len <= buf.writableBytes());
+ final int originalWriterIndex = buf.writerIndex();
+ ByteBuffer nioBuf = buf.nioBuffer(originalWriterIndex, len);
+ CodedOutputStream out = CodedOutputStream.newInstance(nioBuf);
+ message.writeTo(out);
+ out.flush();
+ buf.writerIndex(originalWriterIndex + len);
+ }
+
+ private static <M extends Message> void serializeOutputStream(M message, ByteBuf buf)
+ throws IOException {
+ message.writeTo(new ByteBufOutputStream(buf));
+ }
+
+ private static <M extends Message> M zeroCopyDeserialize(ByteBuf buf, Parser<M> parser)
+ throws InvalidProtocolBufferException {
+ final int messageLength = buf.readableBytes();
+ final int originalReaderIndex = buf.readerIndex();
+ ByteBuffer nioBuffer = buf.nioBuffer(originalReaderIndex, messageLength);
+ CodedInputStream in = CodedInputStream.newInstance(nioBuffer);
+ M message = parser.parseFrom(in);
+ buf.readerIndex(originalReaderIndex + messageLength);
+ return message;
+ }
+
+ private static <M extends Message> M deserializeInputStream(ByteBuf buf, Parser<M> parser)
+ throws InvalidProtocolBufferException {
+ return parser.parseFrom(new ByteBufInputStream(buf));
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java
new file mode 100644
index 0000000..9e74c76
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.ShutdownException;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class NettyRequest {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyRequest.class);
+
+ private static final AtomicReferenceFieldUpdater<NettyRequest, Channel> ATTEMPT_CHANNEL_CAS =
+ AtomicReferenceFieldUpdater.newUpdater(NettyRequest.class, Channel.class, "attemptChannel");
+
+ // immutable setup
+ private final NettyClientService client;
+
+ // request specific immutable input
+ private final RemoteInvocationMetrics metrics;
+ private final ToFunctionRequestSummary reqSummary;
+ private final ToFunction toFunction;
+ private final long requestCreatedNanos;
+
+ // holder of the result
+ private final CompletableFuture<FromFunction> result = new CompletableFuture<>();
+
+ // request runtime
+ private long attemptStartedNanos;
+ private int numberOfAttempts;
+ @Nullable private Closeable retryTask;
+ @Nullable private volatile Channel attemptChannel;
+
+ @OnFlinkThread
+ NettyRequest(
+ NettyClientService client,
+ RemoteInvocationMetrics metrics,
+ ToFunctionRequestSummary requestSummary,
+ ToFunction toFunction) {
+ this.client = Objects.requireNonNull(client);
+ this.reqSummary = Objects.requireNonNull(requestSummary);
+ this.metrics = Objects.requireNonNull(metrics);
+ this.toFunction = Objects.requireNonNull(toFunction);
+ this.requestCreatedNanos = client.systemNanoTime();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Actions
+ // --------------------------------------------------------------------------------------------
+
+ @OnFlinkThread
+ CompletableFuture<FromFunction> start() {
+ client.runOnEventLoop(this::startAttempt);
+ return result;
+ }
+
+ @OnChannelThread
+ void complete(FromFunction fromFn) {
+ try {
+ onAttemptCompleted();
+ } catch (Throwable t) {
+ LOG.warn("Attempt cleanup failed", t);
+ }
+ onFinalCompleted(fromFn, null);
+ }
+
+ @OnClientThread
+ @OnChannelThread
+ void completeAttemptExceptionally(Throwable cause) {
+ try {
+ onAttemptCompleted();
+ } catch (Throwable t) {
+ LOG.warn("Attempt cleanup failed", t);
+ }
+ try {
+ onAttemptCompletedExceptionally(cause);
+ } catch (Throwable t) {
+ onFinalCompleted(null, t);
+ }
+ }
+
+ @OnClientThread
+ private void startAttempt() {
+ try {
+ attemptStartedNanos = client.systemNanoTime();
+ client.acquireChannel(this::onChannelAcquisitionComplete);
+ } catch (Throwable throwable) {
+ completeAttemptExceptionally(throwable);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Events
+ // --------------------------------------------------------------------------------------------
+
+ @OnChannelThread
+ private void onChannelAcquisitionComplete(Channel ch, Throwable cause) {
+ if (cause != null) {
+ completeAttemptExceptionally(cause);
+ return;
+ }
+ if (!ATTEMPT_CHANNEL_CAS.compareAndSet(this, null, ch)) {
+ // strange. I'm trying to acquire a channel, while still holding a channel.
+ // this should never happen, and it is a bug.
+ // lets abort.
+ LOG.warn(
+ "BUG: Trying to acquire a new Netty channel, while still holding an existing one. "
+ + "Failing this request, but continuing processing others.");
+ onFinalCompleted(
+ null,
+ new IllegalStateException(
+ "Unexpected request state, failing this request, but will try others."));
+ return;
+ }
+ // introduce the request to the pipeline.
+ // see ya' at the handler :)
+ client.writeAndFlush(this, ch, this::onFirstWriteCompleted);
+ }
+
+ @OnChannelThread
+ private void onFirstWriteCompleted(Void ignored, Throwable cause) {
+ if (cause != null) {
+ completeAttemptExceptionally(cause);
+ }
+ }
+
+ @OnClientThread
+ @OnChannelThread
+ private void onAttemptCompleted() {
+ // 1. release a channel if we have one. The cas here is not strictly needed,
+ // and it is here to be on the safe side.
+ Channel ch = ATTEMPT_CHANNEL_CAS.getAndSet(this, null);
+ if (ch != null) {
+ client.releaseChannel(ch);
+ }
+ final long nanoElapsed = client.systemNanoTime() - attemptStartedNanos;
+ final long millisElapsed = TimeUnit.NANOSECONDS.toMillis(nanoElapsed);
+ attemptStartedNanos = 0;
+ metrics.remoteInvocationLatency(millisElapsed);
+ IOUtils.closeQuietly(retryTask);
+ retryTask = null;
+ numberOfAttempts++;
+ }
+
+ @OnClientThread
+ @OnChannelThread
+ private void onAttemptCompletedExceptionally(Throwable cause) throws Throwable {
+ metrics.remoteInvocationFailures();
+ LOG.warn(
+ "Exception caught while trying to deliver a message: (attempt #"
+ + (numberOfAttempts - 1)
+ + ")"
+ + reqSummary,
+ cause);
+ if (client.isShutdown()) {
+ throw ShutdownException.INSTANCE;
+ }
+ final long delayUntilNextAttempt = delayUntilNextAttempt();
+ if (delayUntilNextAttempt < 0) {
+ throw RequestTimeoutException.INSTANCE;
+ }
+ analyzeCausalChain(cause);
+ LOG.info(
+ "Retry #"
+ + numberOfAttempts
+ + " "
+ + reqSummary
+ + " ,About to sleep for "
+ + TimeUnit.NANOSECONDS.toMillis(delayUntilNextAttempt));
+
+ // better luck next time!
+ Preconditions.checkState(retryTask == null);
+ this.retryTask = client.newTimeout(this::onAttemptBackoffTimer, delayUntilNextAttempt);
+ }
+
+ @OnClientThread
+ private void onAttemptBackoffTimer() {
+ if (delayUntilNextAttempt() < 0) {
+ completeAttemptExceptionally(RequestTimeoutException.INSTANCE);
+ } else if (client.isShutdown()) {
+ completeAttemptExceptionally(ShutdownException.INSTANCE);
+ } else {
+ startAttempt();
+ }
+ }
+
+ @OnClientThread
+ @OnChannelThread
+ private void onFinalCompleted(FromFunction result, Throwable o) {
+ if (o != null) {
+ this.result.completeExceptionally(o);
+ } else {
+ this.result.complete(result);
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+ // Request specific getters and setters
+ // ---------------------------------------------------------------------------------
+
+ CompletableFuture<FromFunction> result() {
+ return result;
+ }
+
+ long remainingRequestBudgetNanos() {
+ final long usedRequestBudget = client.systemNanoTime() - requestCreatedNanos;
+ return client.totalRequestBudgetInNanos() - usedRequestBudget;
+ }
+
+ ToFunction toFunction() {
+ return toFunction;
+ }
+
+ String uri() {
+ return client.queryPath();
+ }
+
+ private void analyzeCausalChain(Throwable cause) throws Throwable {
+ while (cause != null) {
+ if (!isRetryable(cause)) {
+ throw cause;
+ }
+ cause = cause.getCause();
+ }
+ }
+
+ private boolean isRetryable(Throwable exception) {
+ return !(exception instanceof ShutdownException)
+ && !(exception instanceof RequestTimeoutException);
+ }
+
+ private long delayUntilNextAttempt() {
+ final long remainingRequestBudget = remainingRequestBudgetNanos();
+ if (remainingRequestBudget
+ <= 1_000 * 1_000) { // if we are left with less than a millisecond, don't retry
+ return -1;
+ }
+ // start with 2 milliseconds.
+ final long delay = (2 * 1_000 * 1_000) * (1L << numberOfAttempts);
+ return Math.min(delay, remainingRequestBudget);
+ }
+
+ public ReadOnlyHttpHeaders headers() {
+ return client.headers();
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java
new file mode 100644
index 0000000..fc7252c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyClientFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+
+public final class NettyRequestReplyClientFactory implements RequestReplyClientFactory {
+
+ @Nullable private transient NettySharedResources sharedNettyResources;
+
+ @Override
+ public RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl) {
+ NettySharedResources resources = this.sharedNettyResources;
+ if (resources == null) {
+ this.sharedNettyResources = (resources = new NettySharedResources());
+ }
+ NettyRequestReplySpec clientSpec = parseTransportSpec(transportProperties);
+ return NettyClient.from(resources, clientSpec, endpointUrl);
+ }
+
+ @Override
+ public void cleanup() {
+ NettySharedResources resources = this.sharedNettyResources;
+ this.sharedNettyResources = null;
+ if (resources != null) {
+ resources.shutdownGracefully();
+ }
+ }
+
+ private static NettyRequestReplySpec parseTransportSpec(ObjectNode transportProperties) {
+ try {
+ return OBJ_MAPPER.treeToValue(transportProperties, NettyRequestReplySpec.class);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException("Unable to parse Netty transport spec.", e);
+ }
+ }
+
+ private static final ObjectMapper OBJ_MAPPER = StateFunObjectMapper.create();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
new file mode 100644
index 0000000..43a5dc9
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.WrongHttpResponse;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+
+public final class NettyRequestReplyHandler extends ChannelDuplexHandler {
+
+ private final NettyRequestTimeoutTask requestDurationTracker = new NettyRequestTimeoutTask(this);
+
+ // it is set on write.
+ @Nullable private NettyRequest inflightRequest;
+
+ // cache the request headers. profiling shows that creating request headers takes around 6% of
+ // allocations, so it is very beneficial to cache and reuse the headers.
+ @Nullable private DefaultHttpHeaders cachedHeaders;
+
+ // ---------------------------------------------------------------------------------------------------------
+ // Netty API
+ // ---------------------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+ throws Exception {
+ if (!(msg instanceof NettyRequest)) {
+ super.write(ctx, msg, promise);
+ return;
+ }
+ final NettyRequest request = (NettyRequest) msg;
+ if (inflightRequest != null) {
+ // this is a BUG: sending new request while an old request is in progress.
+ // we fail both of these requests.
+ IllegalStateException cause =
+ new IllegalStateException("A Channel has not finished the previous request.");
+ request.completeAttemptExceptionally(cause);
+ exceptionCaught(ctx, cause);
+ return;
+ }
+ this.inflightRequest = request;
+ // a new NettyRequestReply was introduced into the pipeline.
+ // we remember that request and forward an HTTP request on its behalf upstream.
+ // from now on, every exception thrown during the processing of this pipeline, either during the
+ // following section or
+ // during read(), will be caught and delivered to the @inFlightRequest via #exceptionCaught().
+ ByteBuf content = null;
+ try {
+ content = serializeProtobuf(ctx.channel().alloc()::buffer, request.toFunction());
+ writeHttpRequest(ctx, content, request);
+ scheduleRequestTimeout(ctx, request.remainingRequestBudgetNanos());
+ } catch (Throwable t) {
+ ReferenceCountUtil.safeRelease(content);
+ exceptionCaught(ctx, t);
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object message) {
+ final FullHttpResponse response =
+ (message instanceof FullHttpResponse) ? (FullHttpResponse) message : null;
+ try {
+ readHttpMessage(response);
+ } catch (Throwable t) {
+ exceptionCaught(ctx, t);
+ } finally {
+ ReferenceCountUtil.release(response);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ requestDurationTracker.cancel();
+ if (!ctx.channel().isActive()) {
+ tryComplete(null, cause);
+ } else {
+ ctx.channel().close().addListener(ignored -> tryComplete(null, cause));
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------------------
+ // HTTP Request Response
+ // ---------------------------------------------------------------------------------------------------------
+
+ private void writeHttpRequest(ChannelHandlerContext ctx, ByteBuf bodyBuf, NettyRequest req) {
+ DefaultFullHttpRequest http =
+ new DefaultFullHttpRequest(
+ HttpVersion.HTTP_1_1,
+ HttpMethod.POST,
+ req.uri(),
+ bodyBuf,
+ headers(req, bodyBuf),
+ NettyHeaders.EMPTY);
+
+ ctx.writeAndFlush(http);
+ }
+
+ private DefaultHttpHeaders headers(NettyRequest req, ByteBuf bodyBuf) {
+ final DefaultHttpHeaders headers;
+ if (cachedHeaders != null) {
+ headers = cachedHeaders;
+ } else {
+ headers = new DefaultHttpHeaders(false);
+ headers.add(req.headers());
+ this.cachedHeaders = headers;
+ }
+ headers.remove(HttpHeaderNames.CONTENT_LENGTH);
+ headers.add(HttpHeaderNames.CONTENT_LENGTH, bodyBuf.readableBytes());
+ return headers;
+ }
+
+ private void readHttpMessage(FullHttpResponse response) {
+ NettyRequest current = inflightRequest;
+ checkState(current != null, "A read without a request");
+
+ requestDurationTracker.cancel();
+
+ checkState(response != null, "Unexpected message type");
+ validateFullHttpResponse(response);
+ FromFunction fromFn =
+ NettyProtobuf.deserializeProtobuf(response.content(), FromFunction.parser());
+
+ tryComplete(fromFn, null);
+ }
+
+ public void onReleaseToPool() {
+ requestDurationTracker.cancel();
+ inflightRequest = null;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ requestDurationTracker.cancel();
+ tryComplete(null, DisconnectedException.INSTANCE);
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ requestDurationTracker.cancel();
+ super.channelUnregistered(ctx);
+ }
+
+ private void validateFullHttpResponse(FullHttpResponse response) {
+ //
+ // check the return code
+ //
+ final int code = response.status().code();
+ if (code < 200 || code >= 300) {
+ String message =
+ "Unexpected response code " + code + " (" + response.status().reasonPhrase() + ") ";
+ throw new WrongHttpResponse(message);
+ }
+ //
+ // check for the correct content type
+ //
+ final boolean correctContentType =
+ response
+ .headers()
+ .containsValue(
+ HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM, true);
+
+ if (!correctContentType) {
+ String gotContentType = response.headers().get(HttpHeaderNames.CONTENT_TYPE);
+ throw new IllegalStateException("Unexpected content type " + gotContentType);
+ }
+ //
+ // a present HTTP body is expected.
+ //
+ checkState(response.content() != null, "Unexpected empty HTTP response (no body)");
+ }
+
+ private void scheduleRequestTimeout(
+ ChannelHandlerContext ctx, final long remainingRequestBudgetNanos) {
+ // compute the minimum request duration with an additional random jitter. The jitter is
+ // uniformly distributed in the range
+ // of [7ms, 13ms).
+ long minRequestDurationJitteredNanos =
+ ThreadLocalRandom.current().nextLong(7_000_000, 13_000_000);
+ long remainingRequestBudget =
+ Math.max(minRequestDurationJitteredNanos, remainingRequestBudgetNanos);
+ requestDurationTracker.schedule(ctx, remainingRequestBudget);
+ }
+
+ private void tryComplete(FromFunction response, Throwable cause) {
+ final NettyRequest current = inflightRequest;
+ if (current == null) {
+ return;
+ }
+ inflightRequest = null;
+ if (cause != null) {
+ current.completeAttemptExceptionally(cause);
+ } else {
+ current.complete(response);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
new file mode 100644
index 0000000..8df823d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.time.Duration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public final class NettyRequestReplySpec {
+
+ @JsonProperty("call")
+ public Duration callTimeout = Duration.ofMinutes(2);
+
+ @JsonProperty("connect")
+ public Duration connectTimeout = Duration.ofSeconds(20);
+
+ @JsonProperty("pool_ttl")
+ public Duration pooledConnectionTTL = Duration.ofSeconds(15);
+
+ @JsonProperty("pool_size")
+ public int connectionPoolMaxSize = 1024;
+
+ @JsonProperty("payload_max_bytes")
+ public int maxRequestOrResponseSizeInBytes = 32 * 1048576;
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java
new file mode 100644
index 0000000..8a29a3a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTimeoutTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException;
+
+final class NettyRequestTimeoutTask implements Runnable {
+ private final NettyRequestReplyHandler handler;
+ @Nullable private ScheduledFuture<?> future;
+ @Nullable private ChannelHandlerContext ctx;
+
+ public NettyRequestTimeoutTask(NettyRequestReplyHandler handler) {
+ this.handler = Objects.requireNonNull(handler);
+ }
+
+ void schedule(ChannelHandlerContext ctx, long remainingRequestBudget) {
+ this.ctx = Objects.requireNonNull(ctx);
+ this.future = ctx.executor().schedule(this, remainingRequestBudget, TimeUnit.NANOSECONDS);
+ }
+
+ void cancel() {
+ if (future != null) {
+ future.cancel(false);
+ future = null;
+ }
+ ctx = null;
+ }
+
+ @Override
+ public void run() {
+ checkState(ctx != null);
+ checkState(future != null);
+ handler.exceptionCaught(ctx, RequestTimeoutException.INSTANCE);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
new file mode 100644
index 0000000..22390c6
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLException;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueue;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueueEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueueSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.util.IOUtils;
+
+final class NettySharedResources {
+ private final AtomicBoolean shutdown = new AtomicBoolean();
+ private final Bootstrap bootstrap;
+ @Nullable private SslContext sslContext;
+
+ private final CloseableRegistry mangedResources = new CloseableRegistry();
+
+ public NettySharedResources() {
+ // TODO: configure DNS resolving
+ final EventLoopGroup workerGroup;
+ final Class<? extends Channel> channelClass;
+ if (Epoll.isAvailable()) {
+ workerGroup = new EpollEventLoopGroup(demonThreadFactory("netty-http-worker"));
+ channelClass = EpollSocketChannel.class;
+ } else if (KQueue.isAvailable()) {
+ workerGroup = new KQueueEventLoopGroup(demonThreadFactory("http-netty-worker"));
+ channelClass = KQueueSocketChannel.class;
+ } else {
+ workerGroup = new NioEventLoopGroup(demonThreadFactory("netty-http-client"));
+ channelClass = NioSocketChannel.class;
+ }
+ registerClosable(workerGroup::shutdownGracefully);
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(channelClass);
+
+ this.bootstrap = bootstrap;
+ }
+
+ public Bootstrap bootstrap() {
+ return bootstrap;
+ }
+
+ public SslContext sslContext() {
+ SslContext sslCtx = sslContext;
+ if (sslCtx != null) {
+ return sslCtx;
+ }
+ try {
+ sslCtx = SslContextBuilder.forClient().build();
+ this.sslContext = sslCtx;
+ return sslCtx;
+ } catch (SSLException e) {
+ throw new IllegalStateException("Failed to initialize an SSL provider", e);
+ }
+ }
+
+ public void registerClosable(Closeable closeable) {
+ try {
+ mangedResources.registerCloseable(closeable);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public boolean isShutdown() {
+ return shutdown.get();
+ }
+
+ public void shutdownGracefully() {
+ if (shutdown.compareAndSet(false, true)) {
+ IOUtils.closeQuietly(mangedResources);
+ }
+ }
+
+ private static ThreadFactory demonThreadFactory(String name) {
+ return runnable -> {
+ Thread t = new Thread(runnable);
+ t.setDaemon(true);
+ t.setName(name);
+ return t;
+ };
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java
new file mode 100644
index 0000000..9e58046
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyTransportModule.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.extensions.ExtensionModule;
+import org.apache.flink.statefun.sdk.TypeName;
+
+@AutoService(ExtensionModule.class)
+public class NettyTransportModule implements ExtensionModule {
+
+ public static final TypeName NETTY_TRANSPORT =
+ TypeName.parseFrom("io.statefun.transports.v1/async");
+
+ @Override
+ public void configure(Map<String, String> globalConfigurations, Binder binder) {
+ binder.bindExtension(NETTY_TRANSPORT, new NettyRequestReplyClientFactory());
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java
new file mode 100644
index 0000000..9939433
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnChannelThread.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A Thread that is assigned to a specific Netty Channel. In Netty's programming model, each channel
+ * is assigned to a specific thread for the lifetime of the channel, and all Channel releated events
+ * are dispatched on this thread.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnChannelThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java
new file mode 100644
index 0000000..bbd7fec
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnClientThread.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A specific Thread that is bound to a {@link NettyClient} (see {@link NettyClient#eventLoop}).
+ * This thread is assigned to a specific NettyClient and never changes.
+ */
+@Documented
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnClientThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java
new file mode 100644
index 0000000..35192a8
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/OnFlinkThread.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** A Thread that executes Flink's operator. */
+@Documented
+@Target({ElementType.METHOD, ElementType.CONSTRUCTOR})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OnFlinkThread {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java
new file mode 100644
index 0000000..f349097
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/DisconnectedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+import java.io.IOException;
+
+public final class DisconnectedException extends IOException {
+ public static final DisconnectedException INSTANCE = new DisconnectedException();
+
+ private DisconnectedException() {
+ super("Disconnected");
+ setStackTrace(new StackTraceElement[0]);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java
new file mode 100644
index 0000000..a73d52d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/NoMoreRoutesException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public class NoMoreRoutesException extends RuntimeException {
+ public NoMoreRoutesException(String message) {
+ super(message);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java
new file mode 100644
index 0000000..becf70d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/RequestTimeoutException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+import java.util.concurrent.TimeoutException;
+
+public final class RequestTimeoutException extends TimeoutException {
+ public static final RequestTimeoutException INSTANCE = new RequestTimeoutException();
+
+ public RequestTimeoutException() {
+ setStackTrace(new StackTraceElement[] {});
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java
new file mode 100644
index 0000000..0bb5b78
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/ShutdownException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public final class ShutdownException extends RuntimeException {
+
+ public static final ShutdownException INSTANCE = new ShutdownException();
+
+ public ShutdownException() {
+ super("Shutdown");
+ setStackTrace(new StackTraceElement[] {});
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java
new file mode 100644
index 0000000..d352d2a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/exceptions/WrongHttpResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient.exceptions;
+
+public final class WrongHttpResponse extends RuntimeException {
+
+ public WrongHttpResponse(String message) {
+ super(message);
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java
new file mode 100644
index 0000000..2cf1134
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/EndpointTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.junit.Test;
+
+public class EndpointTest {
+
+ @Test
+ public void exampleUsage() {
+ Endpoint endpoint = new Endpoint(URI.create("https://api.gateway.com:1234/statefun?xyz=5678"));
+
+ assertThat(endpoint.useTls(), is(true));
+ assertThat(endpoint.serviceAddress().getHostString(), is("api.gateway.com"));
+ assertThat(endpoint.serviceAddress().getPort(), is(1234));
+ assertThat(endpoint.queryPath(), is("/statefun?xyz=5678"));
+ }
+
+ @Test
+ public void anotherExample() {
+ Endpoint endpoint = new Endpoint(URI.create("https://greeter-svc/statefun"));
+
+ assertThat(endpoint.useTls(), is(true));
+ assertThat(endpoint.queryPath(), is("/statefun"));
+
+ InetSocketAddress serviceAddress = endpoint.serviceAddress();
+ assertThat(serviceAddress.getHostString(), is("greeter-svc"));
+ assertThat(serviceAddress.getPort(), is(443));
+ }
+
+ @Test
+ public void emptyQueryPathIsASingleSlash() {
+ Endpoint endpoint = new Endpoint(URI.create("http://greeter-svc"));
+
+ assertThat(endpoint.queryPath(), is("/"));
+ }
+
+ @Test
+ public void dontUseTls() {
+ Endpoint endpoint = new Endpoint(URI.create("http://api.gateway.com:1234/statefun?xyz=5678"));
+
+ assertThat(endpoint.useTls(), is(false));
+ }
+
+ @Test
+ public void useTls() {
+ Endpoint endpoint = new Endpoint(URI.create("https://foobar.net"));
+
+ assertThat(endpoint.useTls(), is(true));
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java
new file mode 100644
index 0000000..fc0e3ec
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyProtobufTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.IntFunction;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.junit.After;
+import org.junit.Test;
+
+public class NettyProtobufTest {
+
+ @After
+ public void tearDown() {
+ ALLOCATOR.close();
+ }
+
+ private final AutoReleasingAllocator ALLOCATOR = new AutoReleasingAllocator();
+
+ @Test
+ public void roundTrip() {
+ char[] chars = new char[1024 * 1024];
+ Arrays.fill(chars, 'x');
+ String pad = new String(chars);
+
+ for (int i = 0; i < 100; i++) {
+ int size = ThreadLocalRandom.current().nextInt(1, pad.length());
+ Address original =
+ Address.newBuilder()
+ .setNamespace("namespace")
+ .setType("type")
+ .setId(pad.substring(0, size))
+ .build();
+
+ Address actual = serdeRoundTrip(ALLOCATOR, original);
+
+ assertThat(actual, is(original));
+ }
+ }
+
+ @Test
+ public void heapBufferRoundTrip() {
+ char[] chars = new char[1024 * 1024];
+ Arrays.fill(chars, 'x');
+ String pad = new String(chars);
+
+ IntFunction<ByteBuf> heapAllocator = ByteBufAllocator.DEFAULT::heapBuffer;
+
+ for (int i = 0; i < 100; i++) {
+ int size = ThreadLocalRandom.current().nextInt(1, pad.length());
+ Address original =
+ Address.newBuilder()
+ .setNamespace("namespace")
+ .setType("type")
+ .setId(pad.substring(0, size))
+ .build();
+
+ Address actual = serdeRoundTrip(heapAllocator, original);
+ assertThat(actual, is(original));
+ }
+ }
+
+ private Address serdeRoundTrip(IntFunction<ByteBuf> allocator, Address original) {
+ ByteBuf buf = NettyProtobuf.serializeProtobuf(allocator, original);
+ Address got = NettyProtobuf.deserializeProtobuf(buf, Address.parser());
+ buf.release();
+ return got;
+ }
+
+ private static final class AutoReleasingAllocator implements IntFunction<ByteBuf>, AutoCloseable {
+ private final ArrayDeque<ByteBuf> allocatedDuringATest = new ArrayDeque<>();
+
+ @Override
+ public ByteBuf apply(int value) {
+ ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(value);
+ allocatedDuringATest.addLast(buf);
+ return buf;
+ }
+
+ @Override
+ public void close() {
+ for (ByteBuf buf : allocatedDuringATest) {
+ int refCount = buf.refCnt();
+ if (refCount > 0) {
+ buf.release(refCount);
+ }
+ }
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java
new file mode 100644
index 0000000..29c8f4c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.IdentityHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelConfig;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelMetadata;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException;
+import org.apache.flink.statefun.flink.core.nettyclient.exceptions.ShutdownException;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyRequestTest {
+
+ private final FakeMetrics FAKE_METRICS = new FakeMetrics();
+
+ @Test
+ public void successfulSanity() {
+ FakeClient fakeClient = new FakeClient();
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+ request.complete(FromFunction.getDefaultInstance());
+
+ assertThat(request.result().join(), is(FromFunction.getDefaultInstance()));
+ }
+
+ @Test
+ public void unSuccessfulSanity() {
+ FakeClient fakeClient = new FakeClient();
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+ request.completeAttemptExceptionally(ShutdownException.INSTANCE);
+
+ assertThat(request.result().isCompletedExceptionally(), is(true));
+ }
+
+ @Test
+ public void canNotAcquireChannel() {
+ // a client that never returns a channel.
+ class alwaysFailingToAcquireChannel extends FakeClient {
+ @Override
+ public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+ consumer.accept(null, new IllegalStateException("no channel for you"));
+ }
+ }
+
+ NettyRequest request =
+ new NettyRequest(
+ new alwaysFailingToAcquireChannel(),
+ FAKE_METRICS,
+ FAKE_SUMMARY,
+ ToFunction.getDefaultInstance());
+
+ CompletableFuture<FromFunction> result = request.start();
+
+ assertThat(result.isCompletedExceptionally(), is(true));
+ }
+
+ @Test
+ public void acquiredChannelShouldBeReleased() {
+ FakeClient fakeClient = new FakeClient();
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+
+ assertEquals(1, fakeClient.LIVE_CHANNELS.size());
+ request.completeAttemptExceptionally(ShutdownException.INSTANCE);
+ assertEquals(0, fakeClient.LIVE_CHANNELS.size());
+ }
+
+ @Test
+ public void failingWriteShouldFailTheRequest() {
+ // the following is a client that allows acquiring a channel
+ class client extends FakeClient {
+ @Override
+ public <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> andThen) {
+ andThen.accept(null, new IllegalStateException("can't write."));
+ }
+ }
+
+ client fakeClient = new client();
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+
+ Assert.assertTrue(request.result().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testRemainBudget() {
+ FakeClient fakeClient = new FakeClient();
+ fakeClient.REQUEST_BUDGET = Duration.ofMillis(20).toNanos();
+
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+ // move the clock 5ms forward
+ fakeClient.NOW += Duration.ofMillis(5).toNanos();
+ // fail the request
+ request.completeAttemptExceptionally(DisconnectedException.INSTANCE);
+
+ Assert.assertFalse(request.result().isDone());
+ assertEquals(Duration.ofMillis(15).toNanos(), request.remainingRequestBudgetNanos());
+ }
+
+ @Test
+ public void testRetries() {
+ FakeClient fakeClient = new FakeClient();
+ fakeClient.REQUEST_BUDGET = Duration.ofMillis(20).toNanos();
+
+ NettyRequest request =
+ new NettyRequest(fakeClient, FAKE_METRICS, FAKE_SUMMARY, ToFunction.getDefaultInstance());
+
+ request.start();
+
+ for (int i = 0; i < 20; i++) {
+ request.completeAttemptExceptionally(DisconnectedException.INSTANCE);
+ if (request.result().isCompletedExceptionally()) {
+ return;
+ }
+ fakeClient.NOW += 1_000_000; // + 1ms
+ fakeClient.TIMEOUTS.pop().run();
+ }
+
+ throw new AssertionError();
+ }
+
+ // ---------------------------------------------------------------------------------------------------------
+ // Test collaborators
+ // ---------------------------------------------------------------------------------------------------------
+
+ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection", "FieldCanBeLocal", "FieldMayBeFinal"})
+ private static class FakeClient implements NettyClientService {
+ // test knobs
+
+ private long NOW = 0;
+ private long REQUEST_BUDGET = 0;
+ final ArrayDeque<Runnable> TIMEOUTS = new ArrayDeque<>();
+ final IdentityHashMap<FakeChannel, Boolean> LIVE_CHANNELS = new IdentityHashMap<>();
+
+ @Override
+ public void acquireChannel(BiConsumer<Channel, Throwable> consumer) {
+ FakeChannel ch = new FakeChannel();
+ LIVE_CHANNELS.put(ch, Boolean.TRUE);
+ consumer.accept(ch, null);
+ }
+
+ @SuppressWarnings("SuspiciousMethodCalls")
+ @Override
+ public void releaseChannel(Channel channel) {
+ Boolean existed = LIVE_CHANNELS.remove(channel);
+ if (existed == null) {
+ throw new AssertionError("Trying to release a non allocated channel");
+ }
+ }
+
+ @Override
+ public String queryPath() {
+ return "/";
+ }
+
+ @Override
+ public ReadOnlyHttpHeaders headers() {
+ return new ReadOnlyHttpHeaders(false);
+ }
+
+ @Override
+ public long totalRequestBudgetInNanos() {
+ return REQUEST_BUDGET;
+ }
+
+ @Override
+ public Closeable newTimeout(Runnable client, long delayInNanos) {
+ TIMEOUTS.add(client);
+ return () -> {};
+ }
+
+ @Override
+ public void runOnEventLoop(Runnable task) {
+ task.run();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public long systemNanoTime() {
+ return NOW;
+ }
+
+ @Override
+ public <T> void writeAndFlush(T what, Channel ch, BiConsumer<Void, Throwable> andThen) {}
+ }
+
+ private static final class FakeMetrics implements RemoteInvocationMetrics {
+ int failures;
+
+ @Override
+ public void remoteInvocationFailures() {
+ failures++;
+ }
+
+ @Override
+ public void remoteInvocationLatency(long elapsed) {}
+ }
+
+ private static final ToFunctionRequestSummary FAKE_SUMMARY =
+ new ToFunctionRequestSummary(new Address(new FunctionType("a", "b"), "c"), 50, 3, 1);
+
+ public static class FakeChannel extends AbstractChannel {
+
+ public FakeChannel() {
+ super(null);
+ }
+
+ @Override
+ protected AbstractUnsafe newUnsafe() {
+ return null;
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop eventLoop) {
+ return false;
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return null;
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return null;
+ }
+
+ @Override
+ protected void doBind(SocketAddress socketAddress) {}
+
+ @Override
+ protected void doDisconnect() {}
+
+ @Override
+ protected void doClose() {}
+
+ @Override
+ protected void doBeginRead() {}
+
+ @Override
+ protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) {}
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return null;
+ }
+ }
+}