[FLINK-25866][statefun] Support additional TLS configuration
diff --git a/docs/content/docs/modules/http-endpoint.md b/docs/content/docs/modules/http-endpoint.md
index 5e7e915..83ac834 100644
--- a/docs/content/docs/modules/http-endpoint.md
+++ b/docs/content/docs/modules/http-endpoint.md
@@ -191,6 +191,10 @@
     pool_ttl: 15s
     pool_size: 1024
     payload_max_bytes: 33554432
+    trust_cacerts: ~/trustedCAs.pem
+    client_cert: classpath:clientPublic.crt
+    client_key: ~/clientPrivate.key
+    client_key_password: /tmp/password.txt
 ```
 
 * `call`: total duration of a single request (including retries, and backoffs). After this duration, the call is considered failed.
@@ -198,6 +202,11 @@
 * `pool_ttl`: the amount of time a connection will live in the connection pool. Set to 0 to disable, otherwise the connection will be evicted from the pool after (approximately) that time. If a connection is evicted while it is serving a request, that connection will be only marked for eviction and will be dropped from the pool once the request returns.
 * `pool_size`: the maximum pool size.
 * `payload_max_bytes`: the maximum size for a request or response payload size. The default is set to 32MB.
+* `trust_cacerts`: Trusted public certificate authority certificates in a pem format. If none are provided, but the function uses https, the default jre truststore will be used. If you need to provide more than one CA cert, concat them with a newline in between. This can be taken from a classpath (e.g. classpath:file.pem) or a path.
+* `client_cert`: Client public certificate used for mutual tls authentication. This can be taken from a classpath (e.g. classpath:file.crt) or a path
+* `client_key`: PKCS8 client private key used for mutual tls authentication. This can be taken from a classpath (e.g. classpath:file.key) or a path
+* `client_key_password`: The location of a file containing the client key password (if required). This can be taken from a classpath (e.g. classpath:file.key) or a path
+
 
 {{< hint info >}}
 We highly recommend setting `statefun.async.max-per-task` to a much higher value (see [Configurations]({{< ref "docs/deployment/configurations">}}))
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
index b2e0590..2ad62af 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
@@ -28,6 +28,10 @@
 
     <artifactId>statefun-smoke-e2e-java</artifactId>
 
+    <properties>
+        <netty.shaded.version>4.1.65.Final-14.0</netty.shaded.version>
+    </properties>
+
     <dependencies>
         <!-- Remote Java function dependencies -->
         <dependency>
@@ -36,15 +40,16 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>io.undertow</groupId>
-            <artifactId>undertow-core</artifactId>
-            <version>1.4.18.Final</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
             <version>${protobuf.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-netty</artifactId>
+            <version>${netty.shaded.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
index 6e512f5..14f6d31 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java
@@ -20,33 +20,120 @@
 
 import static org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN;
 
-import io.undertow.Undertow;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.*;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
 import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
 import org.apache.flink.statefun.sdk.java.StatefulFunctions;
-import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
 
 public class CommandInterpreterAppServer {
-  public static final int PORT = 8000;
+  private static final int PORT = 8000;
+  private static final String A_SERVER_KEY_PASSWORD = "test";
+  private static final CommandInterpreter commandInterpreter = new CommandInterpreter();
+  private static final StatefulFunctionSpec FN_SPEC =
+      StatefulFunctionSpec.builder(CMD_INTERPRETER_FN)
+          .withSupplier(() -> new CommandInterpreterFn(commandInterpreter))
+          .withValueSpec(CommandInterpreterFn.STATE)
+          .build();
 
-  public static void main(String[] args) {
-    final CommandInterpreter interpreter = new CommandInterpreter();
-    final StatefulFunctionSpec FN_SPEC =
-        StatefulFunctionSpec.builder(CMD_INTERPRETER_FN)
-            .withSupplier(() -> new CommandInterpreterFn(interpreter))
-            .withValueSpec(CommandInterpreterFn.STATE)
-            .build();
-    final StatefulFunctions functions = new StatefulFunctions();
+  public static void main(String[] args) throws IOException, InterruptedException {
+    final InputStream trustCaCerts =
+        Objects.requireNonNull(
+                CommandInterpreter.class.getClassLoader().getResource("certs/a_ca.pem"))
+            .openStream();
+    final InputStream aServerCert =
+        Objects.requireNonNull(
+                CommandInterpreter.class.getClassLoader().getResource("certs/a_server.crt"))
+            .openStream();
+    final InputStream aServerKey =
+        Objects.requireNonNull(
+                CommandInterpreter.class.getClassLoader().getResource("certs/a_server.key.p8"))
+            .openStream();
+
+    ServerBootstrap httpsMutualTlsBootstrap =
+        getServerBootstrap(getChannelInitializer(trustCaCerts, aServerCert, aServerKey));
+
+    httpsMutualTlsBootstrap.bind(PORT).sync();
+  }
+
+  private static ChannelInitializer<Channel> getChannelInitializer(
+      InputStream trustInputStream, InputStream certInputStream, InputStream keyInputStream) {
+    return getTlsEnabledInitializer(
+        SslContextBuilder.forServer(certInputStream, keyInputStream, A_SERVER_KEY_PASSWORD)
+            .trustManager(trustInputStream));
+  }
+
+  private static ChannelInitializer<Channel> getTlsEnabledInitializer(
+      SslContextBuilder sslContextBuilder) {
+    return new ChannelInitializer<Channel>() {
+      @Override
+      protected void initChannel(Channel channel) throws IOException {
+        ChannelPipeline pipeline = channel.pipeline();
+        SslContext sslContext =
+            sslContextBuilder.sslProvider(SslProvider.JDK).clientAuth(ClientAuth.REQUIRE).build();
+        pipeline.addLast(sslContext.newHandler(channel.alloc()));
+        addResponseHandlerToPipeline(pipeline);
+      }
+    };
+  }
+
+  private static ServerBootstrap getServerBootstrap(ChannelInitializer<Channel> childHandler) {
+    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    return new ServerBootstrap()
+        .group(eventLoopGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(childHandler)
+        .option(ChannelOption.SO_BACKLOG, 128)
+        .childOption(ChannelOption.SO_KEEPALIVE, true);
+  }
+
+  private static void addResponseHandlerToPipeline(ChannelPipeline pipeline) {
+    pipeline.addLast(new HttpServerCodec());
+    pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
+    pipeline.addLast(getStatefunInboundHandler());
+  }
+
+  private static SimpleChannelInboundHandler<FullHttpRequest> getStatefunInboundHandler() {
+    StatefulFunctions functions = new StatefulFunctions();
     functions.withStatefulFunction(FN_SPEC);
 
-    final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
+    return new SimpleChannelInboundHandler<FullHttpRequest>() {
+      @Override
+      protected void channelRead0(
+          ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
+        CompletableFuture<Slice> res =
+            functions
+                .requestReplyHandler()
+                .handle(Slices.wrap(fullHttpRequest.content().nioBuffer()));
+        res.whenComplete(
+            (r, e) -> {
+              FullHttpResponse response =
+                  new DefaultFullHttpResponse(
+                      HttpVersion.HTTP_1_1,
+                      HttpResponseStatus.OK,
+                      Unpooled.copiedBuffer(r.toByteArray()));
+              response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+              response.headers().set(HttpHeaderNames.CONTENT_LENGTH, r.readableBytes());
 
-    // Use the request-reply handler along with your favorite HTTP web server framework
-    // to serve the functions!
-    final Undertow httpServer =
-        Undertow.builder()
-            .addHttpListener(PORT, "0.0.0.0")
-            .setHandler(new UndertowHttpHandler(requestReplyHandler))
-            .build();
-    httpServer.start();
+              channelHandlerContext.write(response);
+              channelHandlerContext.flush();
+            });
+      }
+    };
   }
 }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/UndertowHttpHandler.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/UndertowHttpHandler.java
deleted file mode 100644
index dbb8a79..0000000
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/UndertowHttpHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.smoke.java;
-
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.Headers;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
-import org.apache.flink.statefun.sdk.java.slice.Slice;
-import org.apache.flink.statefun.sdk.java.slice.Slices;
-
-/**
- * A simple Undertow {@link HttpHandler} that delegates requests from StateFun runtime processes to
- * a StateFun {@link RequestReplyHandler}.
- */
-final class UndertowHttpHandler implements HttpHandler {
-  private final RequestReplyHandler handler;
-
-  UndertowHttpHandler(RequestReplyHandler handler) {
-    this.handler = Objects.requireNonNull(handler);
-  }
-
-  @Override
-  public void handleRequest(HttpServerExchange exchange) {
-    exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
-  }
-
-  private void onRequestBody(HttpServerExchange exchange, byte[] requestBytes) {
-    exchange.dispatch();
-    CompletableFuture<Slice> future = handler.handle(Slices.wrap(requestBytes));
-    future.whenComplete((response, exception) -> onComplete(exchange, response, exception));
-  }
-
-  private void onComplete(HttpServerExchange exchange, Slice responseBytes, Throwable ex) {
-    if (ex != null) {
-      ex.printStackTrace(System.out);
-      exchange.getResponseHeaders().put(Headers.STATUS, 500);
-      exchange.endExchange();
-      return;
-    }
-    exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/octet-stream");
-    exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
-  }
-}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/README.md b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/README.md
new file mode 100644
index 0000000..52b9948
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/README.md
@@ -0,0 +1 @@
+For instructions on how these cert/key files got created, see statefun-flink/statefun-flink-core/src/test/resources/certs/README.md
\ No newline at end of file
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_ca.pem b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_ca.pem
new file mode 100644
index 0000000..9615486
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUTRXcSpygsZsWmuX4QMz9ey0rPBYwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAyMjIx
+MzA2MDJaGA8yMTIyMDEyOTEzMDYwMlowUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKBR0E/f
+fcTFxs8Oo4ItZm9HYuPR6U52OuBDN8TdzjMfA+FKoTvqvt/RUg7y6QFCdVEFwMS1
+uyjxoMlGDX1MF9SfDTH9oZlND+DL4YatsqpEXGRfjYSheBPUWusoQ6BbXvjiUNf4
+2bEdTFHnb08pdt0ixtymPp3vLsoUWAqscqYKOj4DW0AMRq2fAMdt0NlJsvW9hUQr
+I8wvU90SJlZ/YNOMv4uae8q67YQKZziRpFaLTM5aFjXnsIYAA6Z+PByFjpLTofgc
+BNuGM5WIk7O88UdZOwd5OCZhkaXrPCkkCtyyPDO8kWiiSmdTOnQ2sjkOSKgNTXkd
++6K7REk5gZ8WVv8CAwEAAaNTMFEwHQYDVR0OBBYEFFvccwv5Cufwh+0kLG0vmQCF
+6JbLMB8GA1UdIwQYMBaAFFvccwv5Cufwh+0kLG0vmQCF6JbLMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBADfgGnHZ42Cxoe5gNoWIUdgFjBqeiWOe
+NHNDLgXkx/jcOsbAO6H3+tz3Dz8QL3JIYTrwRKv0vX+5GAkkRz2R2ZaN4xHz73Co
+SuFfIqq4MjjrymVcnNA347vk50FjOMgfHrxpS3UQeTvlb1iuA88Zk8ewzhY376+t
+MoFmT1/ocb2E7jvxR0kDNCK5XsJIGzJmCBq8nIAD6wxrPXU3HJ/GLBlL5sL5kRrN
+x7l/DnL2oqN2DuyFmf4g03+DVmuu87XrbDrGHnn57CVnUe7z4jCFE82vA4u/tppK
+VgI17uYA714s3Jw6Iw+u38cmmSb79AVG05D2b2+TsqICVet3bc9MkQ8=
+-----END CERTIFICATE-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.crt b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.crt
new file mode 100644
index 0000000..9268b38
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.crt
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDjTCCAnWgAwIBAgIUOfy6Qa7zTjHWOwUTdNZrzoPVVjIwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAzMDIx
+MjE4MTRaGA8yMTIyMDIwNjEyMTgxNFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCC
+ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANj0NXPAatJmpQZSqdoNU/aI
+bs+b7WXHJ7if2pRU+sdbAdIqU9+eGK+9lzkyin8YExvZokDXV2cHVQCW2kO8qnZE
+pksKk9ZtHMe8RoksbRN2uNq174GXLlku52+IleJE6SiuHDWZXU0s0tZWl3LL5yei
+oKaWLrF4bKxdZqm2bnWe3VZJVgiIJOuHMMoSQPE8BYnUe6n0YjpZR7vHifBLf0wM
+w70PuZrzcADRiokqVFS5dwZYhW1xErbNA0/pYg3MsnQeNuWyJ517KkYSUxCAnb8q
+LHWGBgcqJ7CSmGGpcLDIDEXJZg4lT/SQp08n99+EoTy7rkncoG+pG5IBmyJKLtUC
+AwEAAaNnMGUwHwYDVR0jBBgwFoAUW9xzC/kK5/CH7SQsbS+ZAIXolsswCQYDVR0T
+BAIwADALBgNVHQ8EBAMCBPAwKgYDVR0RBCMwIYIJbG9jYWxob3N0ghRyZW1vdGUt
+ZnVuY3Rpb24taG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAVo2WzynblomqTGqUss9S
+eCOTrKkySETvGLK/GLLKaxn1Du/JRgEpIXCo5Ri3MQ8MBtpi748kN3z6Jh7ouoeu
+tG1I/hll0naxUzBDBzx25ZxSxGCQG/eoOFgRLF5OQ9L5BKRMW1T4/XNJqLtUVoiw
+qiulkNYJs2tKD/swAkM3LwAGuXl/p9KbGoYmxozwDYrW9PohL2zS0qzBRriJQGhM
+x65SeX50KhiDXx6mXTVUfFU91lh5H6er8SOHbS103jt0B7Y3403mhDmq2NcmY2Ln
+P3d6AZlhXwzsFDxD45U4lxYxMWTy6rOZkcCqrV9LGHYbmZYQPVgmiseWWo6qjLiI
+pw==
+-----END CERTIFICATE-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.key.p8 b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.key.p8
new file mode 100644
index 0000000..00ab834
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/main/resources/certs/a_server.key.p8
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE4zAcBgoqhkiG9w0BDAEBMA4ECMk3nj4DFVXtAgIIAASCBMFgWecNkVuMByRI
+P3qAOsQxC+Tmf1eamSe4jn+WS022jUTaqO8c+T/9yyj+UJE9w1VKb1+RYAcnijAY
+JJghl82TTIZgzQlIcD1MwiV2E6Fiz/HolaCeEIOHhNEZDdI5INke92uon++TG/ue
+aW2dhpy7AFL138BUr1GfeYZgnpRWh+VuR9tgR4CRbINFqohD0nein5XUMHUZr+cY
+pFho0f6P5+AGhkNJJK7b/NMQULUimehnFU+wiKavEkCPcLQ3ZOd6WwadOLJMRpiK
+jrqP5KB6YoB/W83L9qMscZnA6gaLnfBwP42EQOtKVaPVLF3p0vb19SjQ30XNJgwG
+O5snRRuFQJ+IeWDklZMlEGcDrnZrRh6CtOtszBPxwmX2TNjh7XxP3KRUDj2a1dg0
+WHLp9osJuBkHmCa5Yb7EnX/eMWfrGRnJ8h3gIPP237rgyGkkdgC2uh4KSzZ1m0f7
+hUow3sArKzg3zdMbNIaV7IP/9GjJbSIHVSta7LinWgygHUtgApjKsGn4AMdZsTRO
+NE5cN7M4S7ersvufRF2/1EH485A5HPP4UpD4xtq6v6dEX6hcN+mBoKCwVoeClxSC
+avvWbI6z1LWg+m3vwNywFWJ07tDV+iLQdBk197xeNZ687qjgaxHEG9hgP2neJ7Jg
+lhAtyW0SbgJG/Ux5D7/wc2beQ8VhgsNUuX5XDZd09RsgGtSh28qUb2lhP6jCHH6v
+FDRkWtZDmQDwvofyY9cRWvRxph2GE8P+/KVV36xs1c/SJmaPevM0rI8aRP1ocLXo
+yx+eeQqPPg7gxSx0v9yLQ6j6pj1yhq3CPOnG2vp8MKSjc3U1sTNYGn1mc6Et5bbG
+cLiTIiJv566g2je2xub74Ylcj0dZdispp20M3AYHOLNsO/JNY5rKJgfiuJNbhbNV
+VLMIcB5yr8hhZIE+AoE9FWxFGLhoPwmGoQbvXqlvj8YvL7uqRduMxxwYSULTnINn
+ghG9Gt8E5ha6FSk+neOd2upj9kJng+RT2xlKLTdeaQ4T/w9Uq5Av9tqD9Q8xuUFQ
+Dh1weFH8HjgiJD67ozqZa0u2AbE60QXbTj8ygSOADTEdTNox97NeDRjb9hdyFCbn
+7T5ISYB3ZnughKN1CdwNAz+OEnID0ydidcDvH5DHMkh2Cey4ghMd1LdHH61cswcB
+awWjBCC8MB6tEPeWO49q0Jdq2nXeCWIBvi+0Bsde4BY0yk+S283pu78r6/NZC9y+
+fyNemrFZKmnBIwLQ5TLAxiK6fSo+fZPXRESGzsCCezLp5y1WoMmkEQpyYbw/XWZs
+9WmbldJuvNW2YHfmZ3n8ijtpmyzUlzZrXIPLB9TVqNGINj/bgSG4OjyMT0jmSQ55
+uGOceaG02fQjl1culVIe+axyox0UFQP3g/B5bnCDbOv9J8Nohw3OZo8Me6ThnwsB
+I55krAmytF6vr9FtlOqv+eF9Ahf/RRnr4IrklkgoZpcK21itk5SLPoW/hhfZnvgg
+Lraeycf28uaVknxVZ9gEVGcxa2Ldh9sF4u7+lhTZ/ax22X9QN0fl76pIY8zoRYSL
+o2maqbjH9q8bkOpX7MzXJ349k4h3QrduF+7bC//zoUTLo/rV/7b+LX2s2nWLvBhy
+T4+PpNAkuA==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
index 3d34da5..5437fa0 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
@@ -47,6 +47,7 @@
     StatefulFunctionsAppContainers.Builder builder =
         StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
             .withBuildContextFileFromClasspath("remote-module", "/remote-module/")
+            .withBuildContextFileFromClasspath("certs", "/certs/")
             .dependsOn(remoteFunction);
 
     SmokeRunner.run(parameters, builder);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile
index b5705c4..a629f77 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile
@@ -18,4 +18,5 @@
 RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e
 COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/
 COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/
+COPY certs/ /opt/statefun/modules/statefun-smoke-e2e/certs/
 COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/README.md b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/README.md
new file mode 100644
index 0000000..52b9948
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/README.md
@@ -0,0 +1 @@
+For instructions on how these cert/key files got created, see statefun-flink/statefun-flink-core/src/test/resources/certs/README.md
\ No newline at end of file
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_ca.pem b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_ca.pem
new file mode 100644
index 0000000..9615486
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUTRXcSpygsZsWmuX4QMz9ey0rPBYwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAyMjIx
+MzA2MDJaGA8yMTIyMDEyOTEzMDYwMlowUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKBR0E/f
+fcTFxs8Oo4ItZm9HYuPR6U52OuBDN8TdzjMfA+FKoTvqvt/RUg7y6QFCdVEFwMS1
+uyjxoMlGDX1MF9SfDTH9oZlND+DL4YatsqpEXGRfjYSheBPUWusoQ6BbXvjiUNf4
+2bEdTFHnb08pdt0ixtymPp3vLsoUWAqscqYKOj4DW0AMRq2fAMdt0NlJsvW9hUQr
+I8wvU90SJlZ/YNOMv4uae8q67YQKZziRpFaLTM5aFjXnsIYAA6Z+PByFjpLTofgc
+BNuGM5WIk7O88UdZOwd5OCZhkaXrPCkkCtyyPDO8kWiiSmdTOnQ2sjkOSKgNTXkd
++6K7REk5gZ8WVv8CAwEAAaNTMFEwHQYDVR0OBBYEFFvccwv5Cufwh+0kLG0vmQCF
+6JbLMB8GA1UdIwQYMBaAFFvccwv5Cufwh+0kLG0vmQCF6JbLMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBADfgGnHZ42Cxoe5gNoWIUdgFjBqeiWOe
+NHNDLgXkx/jcOsbAO6H3+tz3Dz8QL3JIYTrwRKv0vX+5GAkkRz2R2ZaN4xHz73Co
+SuFfIqq4MjjrymVcnNA347vk50FjOMgfHrxpS3UQeTvlb1iuA88Zk8ewzhY376+t
+MoFmT1/ocb2E7jvxR0kDNCK5XsJIGzJmCBq8nIAD6wxrPXU3HJ/GLBlL5sL5kRrN
+x7l/DnL2oqN2DuyFmf4g03+DVmuu87XrbDrGHnn57CVnUe7z4jCFE82vA4u/tppK
+VgI17uYA714s3Jw6Iw+u38cmmSb79AVG05D2b2+TsqICVet3bc9MkQ8=
+-----END CERTIFICATE-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.crt b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.crt
new file mode 100644
index 0000000..a67118a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDHzCCAgcCFDn8ukGu804x1jsFE3TWa86D1VYxMA0GCSqGSIb3DQEBCwUAMFEx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxCjAIBgNVBAMMAWEwIBcNMjIwMzAyMTIxODAw
+WhgPMjEyMjAyMDYxMjE4MDBaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l
+LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDT2eBnboaPse50z9R6IgVtl4LZU9YM
+LPYc53nlwr8LLwZ9CM5XJ1WfzOS3m9Ynp7so2d6tvOgpMhtd3cdpVqmMdsFL+r4p
+QQZY/uw5oDC7kHvJEOW9No7dWVkRTqvuoY25OKZ2ysActTOD3LVvmjSWI6OCcyhe
+hlaFYD3KtUVpav+WdTFC45fwR9oXXSEssbPbhjt/Zhm9eMbZoxuVLeiJ5QXme/2G
+qbgnD7rcSMFJf5/WqdWa/H7IOpt5nPdRQSgi7iWYsZi4ZBRZ4I+s+dGOJYN9+LA1
+FDnb3/lUQrjjrfRnz/ZY5PYpz/CwRRwiGNevvIgRNMLqh8Wnil0lDaY/AgMBAAEw
+DQYJKoZIhvcNAQELBQADggEBAJIRP/e/XfXQsQ9DVnZ5b3rCXL9fH80L2X7YGMfC
+VQf8dYt5hTO0kqh5DoqhQ7ZxYXPNn6D7Kr91gwoaPuH5EAAtX6ULcuhqS25EN01D
+ENru2osWExK2bjI76013iKclHwcTq6IYLsiOUJPb0L/K2AoYn8dXg6YtRDmK+dsa
+iztN0dDNyQiz4aLOeSP7gvOMRLl4Fkq3Rws+THIxQJwtNL7Crk0qS+/d6iRENcSv
+rulS0QeaP43HlwiCOGpl4Ta4i3RbmSXZUWYzCJAZph14MA1xFEv2RLyYHkGNwy2T
+TESdWCcogBQheaBcpoC/ZIlfl7AmNg6fmPPgi8ZCOh6ggsg=
+-----END CERTIFICATE-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.key.p8 b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.key.p8
new file mode 100644
index 0000000..c8e590c
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/a_client.key.p8
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE4zAcBgoqhkiG9w0BDAEBMA4ECNOL3dIOJsQbAgIIAASCBMG+OuJ4Esbq3PvJ
+gP4WfWxR0/vmj/H+yrvnfwegiijJKDmqDMyqW8T++7pThOxtdMFgGHkBDx4jdmUo
+weVyVtblFcVbflttntnQWJ/q9+YN++VZMNywQJFpUVS2Ga89kvdJR+obd2CWMSdB
+eRlZN9puXuSPpqgC7uCO1spw/bYojhZIE4b+B2ALSKKSSrnibXE7bNqhIRj9emCa
+nkfRq83Qbqt1RIwteLrw3BIeBMjyAqvPuavhE1LCBKTSi/X9A/NlIJWFdxEDlZND
+iEE2ls5/m2iKPpOZpQnEKB5jDmKk4DF4LWWBkq7jGSbk82Um5hlW3MiK/Sa7YVMD
+YCqlIcE+SDY6ggvN3Q7hNw2z+tHhUpeViM4V8XEKYQza4Rui3jSHlei6lUjZVp82
+XaNqW3hHoVZWuRDTdkBIoBpEWbr5sORMNagcEVy+t9xeIIvLRTj2D2GoeAaLsJrJ
+taU/29Dh2vZBh8nXenbPejkO+Phi1xsVIuLIdFN4+i6062fMsY1mlRliTHEw9CKA
+3A0g2f6t90Tgvpn8R3qc+hGZ5ueKSc19e3XpPu5jVqJS8ZAQUS7zZ0eL5O3p6mBg
+e3dPGg/5acA91Zuo9XouQ8APFYd8Fcjlwr4yp0IoSWofjWkXXFjxE5jXO6VHUwhW
+Q9Lm11+kAGo8MeMrX9pNcmHUp0ddvsRWtqZNQ3Yfu+DinidOj0MAYdVw0GM6wcSq
+MmsaLsF+7VCbV3AseJqDQRBjyeLoCCnYg71Gn1/9QSbAZKvqI3jDrPzDSNtK+WEP
+CD22WmZOaOYmI5dh4zB3nPlGmds2QKEvTXvBkWEnVkUUnoZ1glKwNWakj9C2aHyF
+PUisISxEiid8wtVXvCGdIETuhNBoddBKiMlbbQVoy3vyC4lnRsZk2E65PRSdQcnv
+wNGhfBDmuTeoCr/WBYKamhhhfAzNEPSUZSjFT7qp+AcqnaVVGaZeokr4uB3T68JE
+rHmkAmpSQHXrZnzkHayxB93A717QcckZfmeWAZwY/1+q30Kpfzz1wgHWZ8DPy4nw
+KtY6H0jlEzX41/6t9YLF9UfsL2Yu3o9F3leIvDW3efjM+olPvWYLSXlfUW5S4amv
+7yeFp/ABAt6YPkpevxgHyDQ9/imA0vjT9beHIqhJ5KZPHurV6fT/JK5SAxp3ijKx
+9dkWUxSZPZujD1fNEjwOgMiXFkkNq+Sh5OQNyOsqvTWcXW/+/e9/cVjD9u0cDrMP
+N6rrFZs5Ye3Q8YG6Kgj6vCOskDvGyXwodqTAGaO8a8qLfsv2fNLUyZyAStz9SYuS
+NUwLacD9AfSJkIlPAs9+gJbcMFPsCR40EsAZYaKlH+6GHtx+G9qW28zVIk12YHQC
+JJOS3A3rCueTls6UNkAMQRFbuw6lazDwvkwpYfpH9TuqofebQpR/uo3y1T+yI93U
+3dKD4uUbGn5S2/B7i6b3hedJcgxr7Amc3byyJPjcLUtPv5djUypVh6Q7sb9du1CO
+FVd3UnY0kz7Wkdd0BOEGviwpBK9oKi08ZXKrHJnFS1o7ypEwIQFAGLn1JUYdg7nr
+UguPaudKQ6+MF84UC1dJ8FdsRZ9vhlGfi9Ilg9KlfUwNSD+9MlS9oJDj9QSMy6lU
+dv41UJPZvg==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/key_password.txt b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/key_password.txt
new file mode 100644
index 0000000..30d74d2
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/certs/key_password.txt
@@ -0,0 +1 @@
+test
\ No newline at end of file
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
index ab747d8..6836ae5 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml
@@ -16,5 +16,11 @@
 kind: io.statefun.endpoints.v2/http
 spec:
   functions: statefun.smoke.e2e/command-interpreter-fn
-  urlPathTemplate: http://remote-function-host:8000
-  maxNumBatchRequests: 10000
\ No newline at end of file
+  urlPathTemplate: https://remote-function-host:8000
+  maxNumBatchRequests: 10000
+  transport:
+    type: io.statefun.transports.v1/async
+    trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
+    client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
+    client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
+    client_key_password: /opt/statefun/modules/statefun-smoke-e2e/certs/key_password.txt
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
index d3d02a8..f362723 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/HttpConnectionPoolManager.java
@@ -18,8 +18,10 @@
 package org.apache.flink.statefun.flink.core.nettyclient;
 
 import java.util.Objects;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
 import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelPoolHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
@@ -33,13 +35,19 @@
   private final SslContext sslContext;
   private final String peerHost;
   private final int peerPort;
+  private final Supplier<ChannelDuplexHandler> requestReplyHandlerSupplier;
 
   public HttpConnectionPoolManager(
-      @Nullable SslContext sslContext, NettyRequestReplySpec spec, String peerHost, int peerPort) {
+      @Nullable SslContext sslContext,
+      NettyRequestReplySpec spec,
+      String peerHost,
+      int peerPort,
+      Supplier<ChannelDuplexHandler> requestReplyHandlerSupplier) {
     this.spec = Objects.requireNonNull(spec);
     this.peerHost = Objects.requireNonNull(peerHost);
     this.sslContext = sslContext;
     this.peerPort = peerPort;
+    this.requestReplyHandlerSupplier = requestReplyHandlerSupplier;
   }
 
   @Override
@@ -64,7 +72,7 @@
     p.addLast(new HttpClientCodec());
     p.addLast(new HttpContentDecompressor(true));
     p.addLast(new HttpObjectAggregator(spec.maxRequestOrResponseSizeInBytes, true));
-    p.addLast(new NettyRequestReplyHandler());
+    p.addLast(requestReplyHandlerSupplier.get());
 
     long channelTimeToLiveMillis = spec.pooledConnectionTTL.toMillis();
     p.addLast(new HttpConnectionPoolHandler(channelTimeToLiveMillis));
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
index e6fa908..b25b45f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java
@@ -20,13 +20,21 @@
 import static org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
 import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelHealthChecker;
@@ -34,12 +42,15 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.pool.FixedChannelPool;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
+import org.apache.flink.statefun.flink.common.ResourceLocator;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
 import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
 import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.util.Preconditions;
 
 final class NettyClient implements RequestReplyClient, NettyClientService {
   private final NettySharedResources shared;
@@ -51,6 +62,14 @@
 
   public static NettyClient from(
       NettySharedResources shared, NettyRequestReplySpec spec, URI endpointUrl) {
+    return from(shared, spec, endpointUrl, NettyRequestReplyHandler::new);
+  }
+
+  static NettyClient from(
+      NettySharedResources shared,
+      NettyRequestReplySpec spec,
+      URI endpointUrl,
+      Supplier<ChannelDuplexHandler> nettyRequestReplyHandlerSupplier) {
     Endpoint endpoint = new Endpoint(endpointUrl);
     long totalRequestBudgetInNanos = spec.callTimeout.toNanos();
     ReadOnlyHttpHeaders headers = NettyHeaders.defaultHeadersFor(endpoint.serviceAddress());
@@ -61,14 +80,15 @@
     bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
     bootstrap.remoteAddress(endpoint.serviceAddress());
     // setup tls
-    final SslContext sslContext = endpoint.useTls() ? shared.sslContext() : null;
+    final SslContext sslContext = endpoint.useTls() ? getSslContext(spec) : null;
     // setup a channel pool handler
     ChannelPoolHandler poolHandler =
         new HttpConnectionPoolManager(
             sslContext,
             spec,
             endpoint.serviceAddress().getHostString(),
-            endpoint.serviceAddress().getPort());
+            endpoint.serviceAddress().getPort(),
+            nettyRequestReplyHandlerSupplier);
     // setup a fixed capacity channel pool
     FixedChannelPool pool =
         new FixedChannelPool(
@@ -197,8 +217,7 @@
     if (!channel.isActive()) {
       // We still need to return this channel to the pool, because the connection pool
       // keeps track of the number of acquired channel counts, however the pool will first consult
-      // the health
-      // check, and then kick that connection away.
+      // the health check, and then kick that connection away.
       pool.release(channel);
       return;
     }
@@ -208,4 +227,87 @@
     }
     channel.close().addListener(ignored -> pool.release(channel));
   }
+
+  private static SslContext getSslContext(NettyRequestReplySpec spec) {
+    final Optional<String> maybeTrustCaCerts = spec.getTrustedCaCerts();
+    final Optional<String> maybeClientCerts = spec.getClientCerts();
+    final Optional<String> maybeClientKey = spec.getClientKey();
+    final Optional<String> maybeKeyPassword = spec.getClientKeyPassword();
+
+    boolean onlyOneOfEitherCertOrKeyPresent =
+        maybeClientCerts.isPresent() ^ maybeClientKey.isPresent();
+    if (onlyOneOfEitherCertOrKeyPresent) {
+      throw new IllegalStateException(
+          "You need to provide both the cert and they key if you want to use mutual TLS.");
+    }
+
+    final Optional<InputStream> maybeTrustCaCertsInputStream =
+        maybeTrustCaCerts.map(
+            trustedCaCertsLocation ->
+                openStreamIfExistsOrThrow(
+                    ResourceLocator.findNamedResource(trustedCaCertsLocation)));
+
+    final Optional<InputStream> maybeCertInputStream =
+        maybeClientCerts.map(
+            clientCertLocation ->
+                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));
+
+    final Optional<InputStream> maybeKeyInputStream =
+        maybeClientKey.map(
+            clientKeyLocation ->
+                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));
+
+    final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+    maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
+    maybeCertInputStream.ifPresent(
+        certInputStream -> {
+          final InputStream keyInputStream =
+              maybeKeyInputStream.orElseThrow(
+                  () -> new IllegalStateException("The key is required"));
+          if (maybeKeyPassword.isPresent()) {
+            try {
+              final String keyPasswordString =
+                  IOUtils.toString(
+                      ResourceLocator.findNamedResource(maybeKeyPassword.get()),
+                      StandardCharsets.UTF_8);
+              sslContextBuilder.keyManager(certInputStream, keyInputStream, keyPasswordString);
+            } catch (IOException e) {
+              throw new IllegalStateException(
+                  String.format(
+                      "Could not read the key password from the file %s. Examples of the correct usage: 'classpath:file.txt' or '/tmp/pass', etc.",
+                      maybeKeyPassword.get()),
+                  e);
+            }
+          } else {
+            sslContextBuilder.keyManager(certInputStream, keyInputStream);
+          }
+        });
+
+    try {
+      return sslContextBuilder.build();
+    } catch (IOException e) {
+      throw new IllegalStateException("Could not build the ssl context.", e);
+    } finally {
+      maybeTrustCaCertsInputStream.ifPresent(NettyClient::closeWithBestEffort);
+      maybeCertInputStream.ifPresent(NettyClient::closeWithBestEffort);
+      maybeKeyInputStream.ifPresent(NettyClient::closeWithBestEffort);
+    }
+  }
+
+  private static void closeWithBestEffort(InputStream inputStream) {
+    try {
+      inputStream.close();
+    } catch (IOException e) {
+      // ignore
+    }
+  }
+
+  private static InputStream openStreamIfExistsOrThrow(URL url) {
+    Preconditions.checkState(url != null, "The requested resource does not exist.");
+    try {
+      return url.openStream();
+    } catch (IOException e) {
+      throw new IllegalStateException("Could not open " + url.getPath(), e);
+    }
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
index e4e77b9..0221c5c 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java
@@ -35,6 +35,10 @@
   public static final String POOLED_CONNECTION_TTL_PROPERTY = "pool_ttl";
   public static final String CONNECTION_POOL_MAX_SIZE_PROPERTY = "pool_size";
   public static final String MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES_PROPERTY = "payload_max_bytes";
+  public static final String TRUST_CA_CERTS_PROPERTY = "trust_cacerts";
+  public static final String CLIENT_CERT_PROPERTY = "client_cert";
+  public static final String CLIENT_KEY_PROPERTY = "client_key";
+  public static final String CLIENT_KEY_PASSWORD_PROPERTY = "client_key_password";
   public static final String TIMEOUTS_PROPERTY = "timeouts";
 
   // spec default values
@@ -55,6 +59,10 @@
   public final Duration pooledConnectionTTL;
   public final int connectionPoolMaxSize;
   public final int maxRequestOrResponseSizeInBytes;
+  private final String trustedCaCerts;
+  private final String clientCerts;
+  private final String clientKey;
+  private final String clientKeyPassword;
 
   public NettyRequestReplySpec(
       @JsonProperty(CALL_TIMEOUT_PROPERTY) Duration callTimeout,
@@ -63,7 +71,15 @@
       @JsonProperty(CONNECTION_POOL_MAX_SIZE_PROPERTY) Integer connectionPoolMaxSize,
       @JsonProperty(MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES_PROPERTY)
           Integer maxRequestOrResponseSizeInBytes,
+      @JsonProperty(TRUST_CA_CERTS_PROPERTY) String trustedCaCerts,
+      @JsonProperty(CLIENT_CERT_PROPERTY) String clientCerts,
+      @JsonProperty(CLIENT_KEY_PROPERTY) String clientKey,
+      @JsonProperty(CLIENT_KEY_PASSWORD_PROPERTY) String clientKeyPassword,
       @JsonProperty(TIMEOUTS_PROPERTY) Timeouts timeouts) {
+    this.trustedCaCerts = trustedCaCerts;
+    this.clientCerts = clientCerts;
+    this.clientKey = clientKey;
+    this.clientKeyPassword = clientKeyPassword;
     this.callTimeout =
         firstPresentOrDefault(
             ofNullable(timeouts).map(Timeouts::getCallTimeout),
@@ -76,7 +92,7 @@
             ofNullable(connectTimeout),
             () -> DEFAULT_CONNECT_TIMEOUT);
     this.pooledConnectionTTL =
-        ofNullable(pooledConnectionTTL).orElseGet(() -> DEFAULT_POOLED_CONNECTION_TTL);
+        ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL);
     this.connectionPoolMaxSize =
         ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE);
     this.maxRequestOrResponseSizeInBytes =
@@ -84,6 +100,22 @@
             .orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES);
   }
 
+  public Optional<String> getTrustedCaCerts() {
+    return Optional.ofNullable(trustedCaCerts);
+  }
+
+  public Optional<String> getClientCerts() {
+    return Optional.ofNullable(clientCerts);
+  }
+
+  public Optional<String> getClientKey() {
+    return Optional.ofNullable(clientKey);
+  }
+
+  public Optional<String> getClientKeyPassword() {
+    return Optional.ofNullable(clientKeyPassword);
+  }
+
   /**
    * This is a copy of {@linkplain
    * org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec.Timeouts}, to
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
index 22390c6..ed68cdf 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettySharedResources.java
@@ -22,8 +22,6 @@
 import java.io.UncheckedIOException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import javax.net.ssl.SSLException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -36,14 +34,11 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.kqueue.KQueueSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.flink.util.IOUtils;
 
 final class NettySharedResources {
   private final AtomicBoolean shutdown = new AtomicBoolean();
   private final Bootstrap bootstrap;
-  @Nullable private SslContext sslContext;
 
   private final CloseableRegistry mangedResources = new CloseableRegistry();
 
@@ -74,20 +69,6 @@
     return bootstrap;
   }
 
-  public SslContext sslContext() {
-    SslContext sslCtx = sslContext;
-    if (sslCtx != null) {
-      return sslCtx;
-    }
-    try {
-      sslCtx = SslContextBuilder.forClient().build();
-      this.sslContext = sslCtx;
-      return sslCtx;
-    } catch (SSLException e) {
-      throw new IllegalStateException("Failed to initialize an SSL provider", e);
-    }
-  }
-
   public void registerClosable(Closeable closeable) {
     try {
       mangedResources.registerCloseable(closeable);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/TestUtils.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/TestUtils.java
index c49270e..792160c 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/TestUtils.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/TestUtils.java
@@ -17,6 +17,9 @@
  */
 package org.apache.flink.statefun.flink.core;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
 import org.apache.flink.statefun.flink.core.generated.EnvelopeAddress;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
@@ -35,4 +38,18 @@
   public static final Address FUNCTION_2_ADDR = new Address(FUNCTION_TYPE, "a-2");
   public static final EnvelopeAddress DUMMY_PAYLOAD =
       EnvelopeAddress.newBuilder().setNamespace("com.foo").setType("greet").setId("user-1").build();
+
+  /**
+   * Opens a stream of throws an exception. Does *not* close the stream
+   *
+   * @param url of the resource to open
+   * @return opened input stream
+   */
+  public static InputStream openStreamOrThrow(URL url) {
+    try {
+      return url.openStream();
+    } catch (IOException e) {
+      throw new IllegalStateException("Could not open " + url.getPath(), e);
+    }
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientTest.java
new file mode 100644
index 0000000..2b9fd30
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.apache.flink.statefun.flink.core.TestUtils.openStreamOrThrow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.*;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
+import org.apache.flink.statefun.flink.common.ResourceLocator;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+public abstract class TransportClientTest {
+  protected static final String A_CA_CERTS_LOCATION = "certs/a_caCerts.pem";
+  protected static final String A_SIGNED_CLIENT_CERT_LOCATION = "certs/a_client.crt";
+  protected static final String A_SIGNED_CLIENT_KEY_LOCATION = "certs/a_client.key.p8";
+  protected static final String A_SIGNED_SERVER_CERT_LOCATION = "certs/a_server.crt";
+  protected static final String A_SIGNED_SERVER_KEY_LOCATION = "certs/a_server.key.p8";
+  protected static final String B_CA_CERTS_LOCATION = "certs/b_caCerts.pem";
+  protected static final String B_SIGNED_CLIENT_CERT_LOCATION = "certs/b_client.crt";
+  protected static final String B_SIGNED_CLIENT_KEY_LOCATION = "certs/b_client.key.p8";
+  protected static final String C_SIGNED_CLIENT_CERT_LOCATION = "certs/c_client.crt";
+  protected static final String C_SIGNED_CLIENT_KEY_LOCATION = "certs/c_client.key.p8";
+  protected static final String A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION = "certs/key_password.txt";
+  protected static final String A_SIGNED_SERVER_KEY_PASSWORD_LOCATION =
+      A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION;
+  protected static final String B_SIGNED_CLIENT_KEY_PASSWORD_LOCATION =
+      A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION;
+  protected static final String TLS_FAILURE_MESSAGE = "Unexpected TLS connection test result";
+
+  public static class FromFunctionNettyTestServer {
+    private EventLoopGroup eventLoopGroup;
+    private EventLoopGroup workerGroup;
+
+    public static FromFunction getStubFromFunction() {
+      return FromFunction.newBuilder()
+          .setInvocationResult(
+              FromFunction.InvocationResponse.newBuilder()
+                  .addOutgoingEgresses(FromFunction.EgressMessage.newBuilder()))
+          .build();
+    }
+
+    public PortInfo runAndGetPortInfo() {
+      eventLoopGroup = new NioEventLoopGroup();
+      workerGroup = new NioEventLoopGroup();
+
+      try {
+        ServerBootstrap httpBootstrap = getServerBootstrap(getChannelInitializer());
+
+        ServerBootstrap httpsMutualTlsBootstrap =
+            getServerBootstrap(
+                getChannelInitializer(
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource("classpath:" + A_CA_CERTS_LOCATION)),
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_CERT_LOCATION)),
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_KEY_LOCATION)),
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_KEY_PASSWORD_LOCATION))));
+
+        ServerBootstrap httpsServerTlsBootstrap =
+            getServerBootstrap(
+                getChannelInitializer(
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_CERT_LOCATION)),
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_KEY_LOCATION)),
+                    openStreamOrThrow(
+                        ResourceLocator.findNamedResource(
+                            "classpath:" + A_SIGNED_SERVER_KEY_PASSWORD_LOCATION))));
+
+        int httpPort = randomFreePort();
+        httpBootstrap.bind(httpPort).sync();
+
+        int httpsMutualTlsPort = randomFreePort();
+        httpsMutualTlsBootstrap.bind(httpsMutualTlsPort).sync();
+
+        int httpsServerTlsOnlyPort = randomFreePort();
+        httpsServerTlsBootstrap.bind(httpsServerTlsOnlyPort).sync();
+
+        return new PortInfo(httpPort, httpsMutualTlsPort, httpsServerTlsOnlyPort);
+      } catch (Exception e) {
+        throw new IllegalStateException("Could not start a test netty server", e);
+      }
+    }
+
+    private ChannelInitializer<Channel> getChannelInitializer(
+        InputStream trustInputStream,
+        InputStream certInputStream,
+        InputStream keyInputStream,
+        InputStream keyPasswordInputStream)
+        throws IOException {
+      String keyPassword = IOUtils.toString(keyPasswordInputStream, StandardCharsets.UTF_8);
+      return getTlsEnabledInitializer(
+          SslContextBuilder.forServer(certInputStream, keyInputStream, keyPassword)
+              .trustManager(trustInputStream),
+          ClientAuth.REQUIRE);
+    }
+
+    private ChannelInitializer<Channel> getChannelInitializer(
+        InputStream certInputStream, InputStream keyInputStream, InputStream keyPasswordInputStream)
+        throws IOException {
+      String keyPassword = IOUtils.toString(keyPasswordInputStream, StandardCharsets.UTF_8);
+      return getTlsEnabledInitializer(
+          SslContextBuilder.forServer(certInputStream, keyInputStream, keyPassword),
+          ClientAuth.NONE);
+    }
+
+    private ChannelInitializer<Channel> getChannelInitializer() {
+      return new ChannelInitializer<Channel>() {
+        @Override
+        protected void initChannel(Channel channel) {
+          addStubResponseToThePipeline(channel.pipeline());
+        }
+      };
+    }
+
+    private ChannelInitializer<Channel> getTlsEnabledInitializer(
+        SslContextBuilder sslContextBuilder, ClientAuth clientAuth) {
+      return new ChannelInitializer<Channel>() {
+        @Override
+        protected void initChannel(Channel channel) throws IOException {
+          ChannelPipeline pipeline = channel.pipeline();
+          SslContext sslContext =
+              sslContextBuilder.sslProvider(SslProvider.JDK).clientAuth(clientAuth).build();
+          pipeline.addLast(sslContext.newHandler(channel.alloc()));
+          addStubResponseToThePipeline(pipeline);
+        }
+      };
+    }
+
+    public void close() throws InterruptedException {
+      eventLoopGroup.shutdownGracefully().sync();
+      workerGroup.shutdownGracefully().sync();
+    }
+
+    private ServerBootstrap getServerBootstrap(ChannelInitializer<Channel> childHandler) {
+      return new ServerBootstrap()
+          .group(eventLoopGroup, workerGroup)
+          .channel(NioServerSocketChannel.class)
+          .childHandler(childHandler)
+          .option(ChannelOption.SO_BACKLOG, 128)
+          .childOption(ChannelOption.SO_KEEPALIVE, true);
+    }
+
+    private void addStubResponseToThePipeline(ChannelPipeline pipeline) {
+      pipeline.addLast(new HttpServerCodec());
+      pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
+      pipeline.addLast(stubFromFunctionHandler());
+    }
+
+    private SimpleChannelInboundHandler<FullHttpRequest> stubFromFunctionHandler() {
+      return new SimpleChannelInboundHandler<FullHttpRequest>() {
+        @Override
+        protected void channelRead0(
+            ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
+          ByteBuf content = Unpooled.copiedBuffer(getStubFromFunction().toByteArray());
+          FullHttpResponse response =
+              new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+          response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+          response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
+          channelHandlerContext.write(response);
+          channelHandlerContext.flush();
+        }
+      };
+    }
+
+    private int randomFreePort() {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        return socket.getLocalPort();
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "No free ports available for the test netty service to use");
+      }
+    }
+
+    public static class PortInfo {
+      private final int httpPort;
+      private final int httpsMutualTlsRequiredPort;
+      private final int httpsServerTlsOnlyPort;
+
+      public PortInfo(int httpPort, int httpsMutualTlsRequiredPort, int httpsServerTlsOnlyPort) {
+        this.httpPort = httpPort;
+        this.httpsMutualTlsRequiredPort = httpsMutualTlsRequiredPort;
+        this.httpsServerTlsOnlyPort = httpsServerTlsOnlyPort;
+      }
+
+      public int getHttpPort() {
+        return httpPort;
+      }
+
+      public int getHttpsMutualTlsRequiredPort() {
+        return httpsMutualTlsRequiredPort;
+      }
+
+      public int getHttpsServerTlsOnlyPort() {
+        return httpsServerTlsOnlyPort;
+      }
+    }
+
+    public static ToFunctionRequestSummary getStubRequestSummary() {
+      return new ToFunctionRequestSummary(
+          new Address(new FunctionType("ns", "type"), "id"), 1, 0, 1);
+    }
+
+    public static ToFunction getEmptyToFunction() {
+      return ToFunction.newBuilder().build();
+    }
+
+    public static RemoteInvocationMetrics getFakeMetrics() {
+      return new RemoteInvocationMetrics() {
+        @Override
+        public void remoteInvocationFailures() {}
+
+        @Override
+        public void remoteInvocationLatency(long elapsed) {}
+      };
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientTest.java
new file mode 100644
index 0000000..38b4c47
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClientTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.nettyclient;
+
+import static org.apache.flink.statefun.flink.core.httpfn.TransportClientTest.FromFunctionNettyTestServer.*;
+import static org.apache.flink.statefun.flink.core.nettyclient.NettyProtobuf.serializeProtobuf;
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import java.net.URL;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
+import org.apache.flink.statefun.flink.core.httpfn.TransportClientTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class NettyClientTest extends TransportClientTest {
+  private static FromFunctionNettyTestServer testServer;
+  private static FromFunctionNettyTestServer.PortInfo portInfo;
+
+  @BeforeClass
+  public static void beforeClass() {
+    testServer = new FromFunctionNettyTestServer();
+    portInfo = testServer.runAndGetPortInfo();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    testServer.close();
+  }
+
+  @Test
+  public void callingTestHttpServiceShouldSucceed() throws Throwable {
+    assertTrue(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(createHttpSpec(), "http", portInfo.getHttpPort())));
+  }
+
+  @Test
+  public void callingTestHttpServiceWithTlsFromPathShouldSucceed() throws Throwable {
+    URL caCertsUrl = getClass().getClassLoader().getResource(A_CA_CERTS_LOCATION);
+    URL clientCertUrl = getClass().getClassLoader().getResource(A_SIGNED_CLIENT_CERT_LOCATION);
+    URL clientKeyUrl = getClass().getClassLoader().getResource(A_SIGNED_CLIENT_KEY_LOCATION);
+    URL clientKeyPasswordUrl =
+        getClass().getClassLoader().getResource(A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION);
+    assertNotNull(caCertsUrl);
+    assertNotNull(clientCertUrl);
+    assertNotNull(clientKeyUrl);
+    assertNotNull(clientKeyPasswordUrl);
+
+    assertTrue(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec(
+                    caCertsUrl.getPath(),
+                    clientCertUrl.getPath(),
+                    clientKeyUrl.getPath(),
+                    clientKeyPasswordUrl.getPath()),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test
+  public void callingTestHttpServiceWithTlsFromClasspathShouldSucceed() throws Throwable {
+    assertTrue(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec(
+                    "classpath:" + A_CA_CERTS_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_CERT_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_KEY_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test
+  public void callingTestHttpServiceWithTlsUsingKeyWithoutPasswordShouldSucceed() throws Throwable {
+    assertTrue(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec(
+                    "classpath:" + A_CA_CERTS_LOCATION,
+                    "classpath:" + C_SIGNED_CLIENT_CERT_LOCATION,
+                    "classpath:" + C_SIGNED_CLIENT_KEY_LOCATION,
+                    null),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test
+  public void callingTestHttpServiceWithJustServerSideTlsShouldSucceed() throws Throwable {
+    assertTrue(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec("classpath:" + A_CA_CERTS_LOCATION, null, null, null),
+                "https",
+                portInfo.getHttpsServerTlsOnlyPort())));
+  }
+
+  @Test(expected = SSLException.class)
+  public void callingTestHttpServiceWithUntrustedTlsClientShouldFail() throws Throwable {
+    assertFalse(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec(
+                    "classpath:" + A_CA_CERTS_LOCATION,
+                    "classpath:" + B_SIGNED_CLIENT_CERT_LOCATION,
+                    "classpath:" + B_SIGNED_CLIENT_KEY_LOCATION,
+                    "classpath:" + B_SIGNED_CLIENT_KEY_PASSWORD_LOCATION),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test(expected = SSLException.class)
+  public void callingAnUntrustedTestHttpServiceWithTlsClientShouldFail() throws Throwable {
+    assertFalse(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec(
+                    "classpath:" + B_CA_CERTS_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_CERT_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_KEY_LOCATION,
+                    "classpath:" + A_SIGNED_CLIENT_KEY_PASSWORD_LOCATION),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test(expected = SSLException.class)
+  public void callingTestHttpServiceWhereTlsRequiredButNoCertGivenShouldFail() throws Throwable {
+    assertFalse(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec("classpath:" + A_CA_CERTS_LOCATION, null, null, null),
+                "https",
+                portInfo.getHttpsMutualTlsRequiredPort())));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void callingTestHttpServerWithNonExistentCertsShouldFail() throws Throwable {
+    assertFalse(
+        TLS_FAILURE_MESSAGE,
+        callUsingStubsAndCheckSuccess(
+            createNettyClient(
+                createSpec("classpath:" + "DEFINITELY_NON_EXISTENT", null, null, null),
+                "https",
+                portInfo.getHttpsServerTlsOnlyPort())));
+  }
+
+  private NettyClientWithResultStatusCodeFuture createNettyClient(
+      NettyRequestReplySpec spec, String protocol, int port) {
+    CompletableFuture<Integer> statusCodeFuture = new CompletableFuture<>();
+    NettyClient nettyClient =
+        NettyClient.from(
+            new NettySharedResources(),
+            spec,
+            URI.create(String.format("%s://localhost:%s", protocol, port)),
+            () ->
+                new ChannelDuplexHandler() {
+                  @Override
+                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+                      throws Exception {
+                    statusCodeFuture.completeExceptionally(cause);
+                    super.exceptionCaught(ctx, cause);
+                  }
+
+                  @Override
+                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+                    final FullHttpResponse response =
+                        (msg instanceof FullHttpResponse) ? (FullHttpResponse) msg : null;
+                    if (response != null) {
+                      statusCodeFuture.complete(response.status().code());
+                    } else {
+                      statusCodeFuture.completeExceptionally(
+                          new IllegalStateException(
+                              "the object received by the test is not a FullHttpResponse"));
+                    }
+                    super.channelRead(ctx, msg);
+                  }
+
+                  @Override
+                  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+                    final NettyRequest request = (NettyRequest) msg;
+                    final ByteBuf bodyBuf =
+                        serializeProtobuf(ctx.channel().alloc()::buffer, request.toFunction());
+                    DefaultFullHttpRequest http =
+                        new DefaultFullHttpRequest(
+                            HttpVersion.HTTP_1_1,
+                            HttpMethod.POST,
+                            request.uri(),
+                            bodyBuf,
+                            new DefaultHttpHeaders(),
+                            NettyHeaders.EMPTY);
+                    ctx.writeAndFlush(http);
+                  }
+                });
+
+    return new NettyClientWithResultStatusCodeFuture(nettyClient, statusCodeFuture);
+  }
+
+  private NettyRequestReplySpec createHttpSpec() {
+    return createSpec(null, null, null, null);
+  }
+
+  private NettyRequestReplySpec createSpec(
+      String trustedCaCerts, String clientCerts, String clientKey, String clientKeyPassword) {
+    return new NettyRequestReplySpec(
+        Duration.ofMinutes(1L),
+        Duration.ofMinutes(1L),
+        Duration.ofMinutes(1L),
+        1,
+        128,
+        trustedCaCerts,
+        clientCerts,
+        clientKey,
+        clientKeyPassword,
+        new NettyRequestReplySpec.Timeouts());
+  }
+
+  private Boolean callUsingStubsAndCheckSuccess(
+      NettyClientWithResultStatusCodeFuture nettyClientAndStatusCodeFuture) throws Throwable {
+    NettyRequest nettyRequest =
+        new NettyRequest(
+            nettyClientAndStatusCodeFuture.nettyClient,
+            getFakeMetrics(),
+            getStubRequestSummary(),
+            getEmptyToFunction());
+    nettyRequest.start();
+
+    try {
+      return nettyClientAndStatusCodeFuture.resultStatusCodeFuture.get(5, TimeUnit.SECONDS) == 200;
+    } catch (ExecutionException e) {
+      throw e.getCause().getCause();
+    }
+  }
+
+  private static class NettyClientWithResultStatusCodeFuture {
+    private final NettyClient nettyClient;
+    private final CompletableFuture<Integer> resultStatusCodeFuture;
+
+    public NettyClientWithResultStatusCodeFuture(
+        NettyClient nettyClient, CompletableFuture<Integer> resultStatusCodeFuture) {
+      this.nettyClient = nettyClient;
+      this.resultStatusCodeFuture = resultStatusCodeFuture;
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/README.md b/statefun-flink/statefun-flink-core/src/test/resources/certs/README.md
new file mode 100644
index 0000000..0499cab
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/README.md
@@ -0,0 +1,176 @@
+# Testing scenario
+
+There are three certificate authorities: `a`, `b` and `c`. Each sign a client cert and a server cert.
+
+There are three sets of client cert and key files
+
+1. `a_client.crt` - signed by CA `a`, and its private key `a_client.key.p8.`, the latter of which has password of `test`
+2. `b_client.crt` - signed by CA `b`, and its private key `b_client.key.p8`, the latter of which has password of `test`
+3. `c_client.crt` - signed by CA `c`, and its private key `c_client.key.p8`, the latter of which does not require a password
+
+There is one server cert and key file
+
+1. `a_server.crt` - signed by CA `a`, and its private key `a_server.key.p8`, the latter of which has password of `test`
+
+There are two trusted CA files
+
+1. `a_caCerts.pem` - contains CAs `a` and `c`
+2. `b_caCerts.pem` contains CAs `b` and `c`
+
+A test server `a` is launched with a truststore containing CAs `a` and `c`
+
+| client | server | connection accepted |
+|--------|--------|---------------------|
+| a      | a      | yes                 |
+| b      | a      | no                  |
+| c      | a      | yes                 |
+
+# Steps for re-creating the files in this folder
+## 1. create private keys for 3 separate certificate authorities
+
+```shell 
+openssl genrsa -des3 -out a_ca.key -passout pass:test 2048
+```
+
+```shell 
+openssl genrsa -des3 -out b_ca.key -passout pass:test 2048
+```
+
+```shell 
+openssl genrsa -des3 -out c_ca.key -passout pass:test 2048
+```
+
+## 2. generate a root certificate for each of those keys
+
+```shell
+openssl req -x509 -new -nodes -key a_ca.key -sha256 -days 36500 -out a_ca.pem -passin pass:test
+```
+
+defaults used everywhere except for CommonName. Output:
+
+```text
+    Country Name (2 letter code) [AU]:
+    State or Province Name (full name) [Some-State]:
+    Locality Name (eg, city) []:
+    Organization Name (eg, company) [Internet Widgits Pty Ltd]:
+    Organizational Unit Name (eg, section) []:
+    Common Name (e.g. server FQDN or YOUR name) []:a
+    Email Address []:
+```
+
+same for b and c
+
+```shell
+openssl req -x509 -new -nodes -key b_ca.key -sha256 -days 36500 -out b_ca.pem -passin pass:test
+```
+
+```shell
+openssl req -x509 -new -nodes -key c_ca.key -sha256 -days 36500 -out c_ca.pem -passin pass:test
+```
+
+## 3. create PKCS8 client and server keys and CSRs
+
+```shell
+openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out a_client.key.p8 -passout pass:test
+openssl req -new -key a_client.key.p8 -out a_client.key.p8.csr -passin pass:test
+```
+
+again, all defaults were used, except for CommonName. Output:
+
+```text
+    You are about to be asked to enter information that will be incorporated
+    into your certificate request.
+    What you are about to enter is what is called a Distinguished Name or a DN.
+    There are quite a few fields but you can leave some blank
+    For some fields there will be a default value,
+    If you enter '.', the field will be left blank.
+    -----
+    Country Name (2 letter code) [AU]:
+    State or Province Name (full name) [Some-State]:
+    Locality Name (eg, city) []:
+    Organization Name (eg, company) [Internet Widgits Pty Ltd]:
+    Organizational Unit Name (eg, section) []:
+    Common Name (e.g. server FQDN or YOUR name) []:        
+    Email Address []:
+    
+    Please enter the following 'extra' attributes
+    to be sent with your certificate request
+    A challenge password []:
+    An optional company name []:
+```
+
+same for the other keys - the rest of clients and also servers
+
+```shell
+openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out a_server.key.p8 -passout pass:test
+openssl req -new -key a_server.key.p8 -out a_server.key.p8.csr -passin pass:test
+```
+
+```shell
+openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out b_client.key.p8 -passout pass:test
+openssl req -new -key b_client.key.p8 -out b_client.key.p8.csr -passin pass:test
+```
+
+(note no pass for c_client)
+```shell
+openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out c_client.key.p8 -nocrypt
+openssl req -new -key c_client.key.p8 -out c_client.key.p8.csr 
+```
+
+## 4. create an extension config for servers
+
+create server.ext
+
+```text
+authorityKeyIdentifier=keyid,issuer
+basicConstraints=CA:FALSE
+keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
+subjectAltName = @alt_names
+
+[alt_names]
+DNS.1 = localhost
+DNS.2 = remote-function-host
+```
+
+## 5. create certificates using our CSR, CA private keys, CA certificates and config files
+
+```shell
+openssl x509 -req -in a_client.key.p8.csr -passin pass:test -CA a_ca.pem -CAkey a_ca.key -CAcreateserial -out a_client.crt -days 36500 -sha256
+```
+
+output:
+
+```text
+    Signature ok
+    subject=C = AU, ST = Some-State, O = Internet Widgits Pty Ltd, CN = a_client.csr
+    Getting CA Private Key
+```
+
+same for other clients and servers (note that clients `b` and `c` are signed ba CA `b` and `c` respectively, and that the additional -extfile server.ext for
+servers)
+
+```shell
+openssl x509 -req -in b_client.key.p8.csr -passin pass:test -CA b_ca.pem -CAkey b_ca.key -CAcreateserial -out b_client.crt -days 36500 -sha256
+```
+
+```shell
+openssl x509 -req -in c_client.key.p8.csr -passin pass:test -CA c_ca.pem -CAkey c_ca.key -CAcreateserial -out c_client.crt -days 36500 -sha256
+```
+
+```shell
+openssl x509 -req -in a_server.key.p8.csr -passin pass:test -CA a_ca.pem -CAkey a_ca.key -CAcreateserial -out a_server.crt -days 36500 -sha256 -extfile server.ext
+```
+
+## 6. create final files
+
+combine CA `c` and `a` to form a_caCerts.pem
+
+```shell
+echo -e "$(cat c_ca.pem)\n$(cat a_ca.pem)" > a_caCerts.pem
+```
+
+combine CA `b` and `c` to form b_caCerts.pem
+
+```shell
+echo -e "$(cat b_ca.pem)\n$(cat c_ca.pem)" > b_caCerts.pem
+```
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.key b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.key
new file mode 100644
index 0000000..b735877
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.key
@@ -0,0 +1,30 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,DF25298A453F56CD
+
+heAH13N3mp2FaLTnXALWRa4AFDcON+uhhM8d01tDRcpoTdCkk8bRAvG7CGt1XJGA
+PfZ5e1Cyqs1rKthSywia1p9OnRdrhdTF3ElnmJa5u4/yjJWFtIpZwDcCW8FRb/Af
+C2I7ygbXaAHUZEd46+0JWhTEhh/o/7s5FU1GCIchdWsoN8XA89t2b51Czq9PvWPP
+CgsrHNIZpEobLK4kKXEAwsQXJUKJnO3pE9FIuWUze+y0iBkqhhyngbJHnXG9ADJ4
+Z1g+lZt/yC4NoCP8mlY3fXbvSWcFrJTlur5rVNiUq+6Av4//bF8Y4sW8BYITo/l4
+EdYZiqCQCT8Z6Y/M5glE7vAzo7FkUQ5IYkcSmEcuV7hjSlfx27rFzYUojuuc3dc5
+V40x6jPgEXvM80YBA+9xpPUFecrKMVFVRI9Z18DTmvuVoZvGvnSK5DamCy/0JayX
+MalR6mVLuS51N2z6JvY27DfVn3FCrG/cs8uTX7RpHEQ+gS/iwjv9Po0q/jDpWCi9
+QR8MxAGX0bB8+qNy+XGaKBcOEJ4LkppOp84ULy252uV3rFwTfBpXmozj2mCbeSxj
+i+Jx0FKH1rqJSHdxw4dOzejPuF+5HO2ZdPy0zlUtIXnULjYZLGdG87YxSZjYAGwo
+jQc2NUjR3icwhdXsg67rQDKdqf1hN8SjZwyEUA8P77n39Zx8hevt5aiX6Ts4NTHJ
+nikAvZhAMV+AROuRoxpmEEfPBJk0M9/nW9E+AdiyRjzPwKSMXNW7pLBsgoX09D7y
+9E2zjnZseXVWlSDzYfJEbXH6dzedBlIzZ7te2XMSJQNQa0A83Q5Xa9pQ9eZORZK9
+PWeMIjNsbYl+OX6/FFO3HDHcj7WHYCw02WJzttaoDSNb5mfxvUxAX0RfGJGHpgAO
+v69ZSQYUrmVNEA2dPBxbDBw8Cnkc7mf15N7ZBL6xvRse2dLaHfCb28eSjoGQQXDS
+peuaUP1v+mj1pxBTK0ekjk9Sjs1gOVXSq6s1HjJfZBTdiIXjjErCoCrbB7eL3csK
+E8usKLHt2v4eeLGtdCAuCvlMq2BEdC6dFMjxZpMyQi6r4o8Runb11L0O8YVU3u61
+6x4tVDeRUo7+2IWaQtYw4NFD9g+1XD32F6nqtOHfMAk07gu2CL5HcIl4HTVhleL7
+BtSNXRgkIqYju2OSpmMsi1CpgdHa+vVxKSnRDZa0qY5ECh3DVuwwmRtWSmunPc+Y
+7AZjCJi94BqemPA6rLM305HW1oEYKpaprg1f7LfgzaBOFR+2lpidEG3Ainl+OFPQ
+90FOLdfr24H9BptiTovsSMtA2LRaU/MMl4GXPfdSW6h6N9UjERBapTtjSbY5SX3e
+193GwjhV2ujaGno3/Rj1XGjilRF5X2Yspp882l4CYmL8gMagvQgQY9RNlJ0xA5O9
+D5l5WD2XZ768ULvM/I+YVQeDnvU6j95QkcedzGKBcV2X7TYEDfje/be4IZV23rx5
+s7vLp7CqW+7SAFb9XBF+Pli693HjQsqStcvJ8O22Kwy4y5mLMQwcrMga69WxW1jw
+XAjIRKrwC+/oIog8DrMeQpo6w98LwtjNiVFdwqxbmg0pb8iqCUKMSpJXPTf97X4d
+-----END RSA PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.pem b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.pem
new file mode 100644
index 0000000..9615486
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUTRXcSpygsZsWmuX4QMz9ey0rPBYwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAyMjIx
+MzA2MDJaGA8yMTIyMDEyOTEzMDYwMlowUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKBR0E/f
+fcTFxs8Oo4ItZm9HYuPR6U52OuBDN8TdzjMfA+FKoTvqvt/RUg7y6QFCdVEFwMS1
+uyjxoMlGDX1MF9SfDTH9oZlND+DL4YatsqpEXGRfjYSheBPUWusoQ6BbXvjiUNf4
+2bEdTFHnb08pdt0ixtymPp3vLsoUWAqscqYKOj4DW0AMRq2fAMdt0NlJsvW9hUQr
+I8wvU90SJlZ/YNOMv4uae8q67YQKZziRpFaLTM5aFjXnsIYAA6Z+PByFjpLTofgc
+BNuGM5WIk7O88UdZOwd5OCZhkaXrPCkkCtyyPDO8kWiiSmdTOnQ2sjkOSKgNTXkd
++6K7REk5gZ8WVv8CAwEAAaNTMFEwHQYDVR0OBBYEFFvccwv5Cufwh+0kLG0vmQCF
+6JbLMB8GA1UdIwQYMBaAFFvccwv5Cufwh+0kLG0vmQCF6JbLMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBADfgGnHZ42Cxoe5gNoWIUdgFjBqeiWOe
+NHNDLgXkx/jcOsbAO6H3+tz3Dz8QL3JIYTrwRKv0vX+5GAkkRz2R2ZaN4xHz73Co
+SuFfIqq4MjjrymVcnNA347vk50FjOMgfHrxpS3UQeTvlb1iuA88Zk8ewzhY376+t
+MoFmT1/ocb2E7jvxR0kDNCK5XsJIGzJmCBq8nIAD6wxrPXU3HJ/GLBlL5sL5kRrN
+x7l/DnL2oqN2DuyFmf4g03+DVmuu87XrbDrGHnn57CVnUe7z4jCFE82vA4u/tppK
+VgI17uYA714s3Jw6Iw+u38cmmSb79AVG05D2b2+TsqICVet3bc9MkQ8=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_caCerts.pem b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_caCerts.pem
new file mode 100644
index 0000000..aa21b19
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_caCerts.pem
@@ -0,0 +1,42 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUdVEDHKaXNfdh0Jgf/7x1xhEOP/owDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYzAgFw0yMjAyMjIx
+MzA2MjNaGA8yMTIyMDEyOTEzMDYyM1owUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALDfn3wo
+XxxKpIter4QsGCYI+AiFxVouJXJlswa6VnSQtFYa6I0gsqVOmCgaWNpYKTKzYRgC
+/neNwpirZLj6A8Ubq37T0cr0D3Et0axWgVkySjVEDZOT4CP5FiSVGiwIN2MexZRH
+EO4njk75m/NGXY6DMiq6qrpStjirQlET2Yd71BqLpDXnfKwN+SOUGfTjwCNkx8sQ
+Vx5ibZtk1K/XihA7FCECTQqbzSsrKv9tDEayuW61KJZTcPCrwloTlQztySHbtiPD
+iojTE8FQAu2PmpDPt0nCSsSAM1EKD6iXyXzj7zzIsKZOZoj8E2d7DVELFYOTenKU
+/dYaVuCDDPBxalsCAwEAAaNTMFEwHQYDVR0OBBYEFLU73nUEzinqIo7fdWjQd2pa
+plF6MB8GA1UdIwQYMBaAFLU73nUEzinqIo7fdWjQd2paplF6MA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACvFU80K7nkSSDxCJZDBdgreO0K/seDH
+tFw5OjCUyfTw706Pm7wUYFwQOkt/agWWYhpnxws0bZaMD7PgWDTpURWLeowxE0Th
+DImuj2KhnFWHjaziM7/aN2/NrlFzdkHGvd2TJM41jWpTmjxYb4VKv1iGu4ds0wA4
+e9IcLHD0wFc5rG7AsXWvPEQAdxvPJbM4tmYm63FaefSNXLrnLUN8Bq2IEA3/Y3gi
+gYmfblMSdT9o8WGGj95EEIrDpJEgAt/C/jO5o74O8s6xkBDBR+13JouaLyVNhXgs
+xAhHNnK5uetPUWGhK0na6WN/Lzn6153lDo0q7xo99+f6nZjNVEpE4TI=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUTRXcSpygsZsWmuX4QMz9ey0rPBYwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAyMjIx
+MzA2MDJaGA8yMTIyMDEyOTEzMDYwMlowUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKBR0E/f
+fcTFxs8Oo4ItZm9HYuPR6U52OuBDN8TdzjMfA+FKoTvqvt/RUg7y6QFCdVEFwMS1
+uyjxoMlGDX1MF9SfDTH9oZlND+DL4YatsqpEXGRfjYSheBPUWusoQ6BbXvjiUNf4
+2bEdTFHnb08pdt0ixtymPp3vLsoUWAqscqYKOj4DW0AMRq2fAMdt0NlJsvW9hUQr
+I8wvU90SJlZ/YNOMv4uae8q67YQKZziRpFaLTM5aFjXnsIYAA6Z+PByFjpLTofgc
+BNuGM5WIk7O88UdZOwd5OCZhkaXrPCkkCtyyPDO8kWiiSmdTOnQ2sjkOSKgNTXkd
++6K7REk5gZ8WVv8CAwEAAaNTMFEwHQYDVR0OBBYEFFvccwv5Cufwh+0kLG0vmQCF
+6JbLMB8GA1UdIwQYMBaAFFvccwv5Cufwh+0kLG0vmQCF6JbLMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBADfgGnHZ42Cxoe5gNoWIUdgFjBqeiWOe
+NHNDLgXkx/jcOsbAO6H3+tz3Dz8QL3JIYTrwRKv0vX+5GAkkRz2R2ZaN4xHz73Co
+SuFfIqq4MjjrymVcnNA347vk50FjOMgfHrxpS3UQeTvlb1iuA88Zk8ewzhY376+t
+MoFmT1/ocb2E7jvxR0kDNCK5XsJIGzJmCBq8nIAD6wxrPXU3HJ/GLBlL5sL5kRrN
+x7l/DnL2oqN2DuyFmf4g03+DVmuu87XrbDrGHnn57CVnUe7z4jCFE82vA4u/tppK
+VgI17uYA714s3Jw6Iw+u38cmmSb79AVG05D2b2+TsqICVet3bc9MkQ8=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.crt b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.crt
new file mode 100644
index 0000000..a67118a
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDHzCCAgcCFDn8ukGu804x1jsFE3TWa86D1VYxMA0GCSqGSIb3DQEBCwUAMFEx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxCjAIBgNVBAMMAWEwIBcNMjIwMzAyMTIxODAw
+WhgPMjEyMjAyMDYxMjE4MDBaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l
+LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDT2eBnboaPse50z9R6IgVtl4LZU9YM
+LPYc53nlwr8LLwZ9CM5XJ1WfzOS3m9Ynp7so2d6tvOgpMhtd3cdpVqmMdsFL+r4p
+QQZY/uw5oDC7kHvJEOW9No7dWVkRTqvuoY25OKZ2ysActTOD3LVvmjSWI6OCcyhe
+hlaFYD3KtUVpav+WdTFC45fwR9oXXSEssbPbhjt/Zhm9eMbZoxuVLeiJ5QXme/2G
+qbgnD7rcSMFJf5/WqdWa/H7IOpt5nPdRQSgi7iWYsZi4ZBRZ4I+s+dGOJYN9+LA1
+FDnb3/lUQrjjrfRnz/ZY5PYpz/CwRRwiGNevvIgRNMLqh8Wnil0lDaY/AgMBAAEw
+DQYJKoZIhvcNAQELBQADggEBAJIRP/e/XfXQsQ9DVnZ5b3rCXL9fH80L2X7YGMfC
+VQf8dYt5hTO0kqh5DoqhQ7ZxYXPNn6D7Kr91gwoaPuH5EAAtX6ULcuhqS25EN01D
+ENru2osWExK2bjI76013iKclHwcTq6IYLsiOUJPb0L/K2AoYn8dXg6YtRDmK+dsa
+iztN0dDNyQiz4aLOeSP7gvOMRLl4Fkq3Rws+THIxQJwtNL7Crk0qS+/d6iRENcSv
+rulS0QeaP43HlwiCOGpl4Ta4i3RbmSXZUWYzCJAZph14MA1xFEv2RLyYHkGNwy2T
+TESdWCcogBQheaBcpoC/ZIlfl7AmNg6fmPPgi8ZCOh6ggsg=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.key.p8 b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.key.p8
new file mode 100644
index 0000000..c8e590c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_client.key.p8
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE4zAcBgoqhkiG9w0BDAEBMA4ECNOL3dIOJsQbAgIIAASCBMG+OuJ4Esbq3PvJ
+gP4WfWxR0/vmj/H+yrvnfwegiijJKDmqDMyqW8T++7pThOxtdMFgGHkBDx4jdmUo
+weVyVtblFcVbflttntnQWJ/q9+YN++VZMNywQJFpUVS2Ga89kvdJR+obd2CWMSdB
+eRlZN9puXuSPpqgC7uCO1spw/bYojhZIE4b+B2ALSKKSSrnibXE7bNqhIRj9emCa
+nkfRq83Qbqt1RIwteLrw3BIeBMjyAqvPuavhE1LCBKTSi/X9A/NlIJWFdxEDlZND
+iEE2ls5/m2iKPpOZpQnEKB5jDmKk4DF4LWWBkq7jGSbk82Um5hlW3MiK/Sa7YVMD
+YCqlIcE+SDY6ggvN3Q7hNw2z+tHhUpeViM4V8XEKYQza4Rui3jSHlei6lUjZVp82
+XaNqW3hHoVZWuRDTdkBIoBpEWbr5sORMNagcEVy+t9xeIIvLRTj2D2GoeAaLsJrJ
+taU/29Dh2vZBh8nXenbPejkO+Phi1xsVIuLIdFN4+i6062fMsY1mlRliTHEw9CKA
+3A0g2f6t90Tgvpn8R3qc+hGZ5ueKSc19e3XpPu5jVqJS8ZAQUS7zZ0eL5O3p6mBg
+e3dPGg/5acA91Zuo9XouQ8APFYd8Fcjlwr4yp0IoSWofjWkXXFjxE5jXO6VHUwhW
+Q9Lm11+kAGo8MeMrX9pNcmHUp0ddvsRWtqZNQ3Yfu+DinidOj0MAYdVw0GM6wcSq
+MmsaLsF+7VCbV3AseJqDQRBjyeLoCCnYg71Gn1/9QSbAZKvqI3jDrPzDSNtK+WEP
+CD22WmZOaOYmI5dh4zB3nPlGmds2QKEvTXvBkWEnVkUUnoZ1glKwNWakj9C2aHyF
+PUisISxEiid8wtVXvCGdIETuhNBoddBKiMlbbQVoy3vyC4lnRsZk2E65PRSdQcnv
+wNGhfBDmuTeoCr/WBYKamhhhfAzNEPSUZSjFT7qp+AcqnaVVGaZeokr4uB3T68JE
+rHmkAmpSQHXrZnzkHayxB93A717QcckZfmeWAZwY/1+q30Kpfzz1wgHWZ8DPy4nw
+KtY6H0jlEzX41/6t9YLF9UfsL2Yu3o9F3leIvDW3efjM+olPvWYLSXlfUW5S4amv
+7yeFp/ABAt6YPkpevxgHyDQ9/imA0vjT9beHIqhJ5KZPHurV6fT/JK5SAxp3ijKx
+9dkWUxSZPZujD1fNEjwOgMiXFkkNq+Sh5OQNyOsqvTWcXW/+/e9/cVjD9u0cDrMP
+N6rrFZs5Ye3Q8YG6Kgj6vCOskDvGyXwodqTAGaO8a8qLfsv2fNLUyZyAStz9SYuS
+NUwLacD9AfSJkIlPAs9+gJbcMFPsCR40EsAZYaKlH+6GHtx+G9qW28zVIk12YHQC
+JJOS3A3rCueTls6UNkAMQRFbuw6lazDwvkwpYfpH9TuqofebQpR/uo3y1T+yI93U
+3dKD4uUbGn5S2/B7i6b3hedJcgxr7Amc3byyJPjcLUtPv5djUypVh6Q7sb9du1CO
+FVd3UnY0kz7Wkdd0BOEGviwpBK9oKi08ZXKrHJnFS1o7ypEwIQFAGLn1JUYdg7nr
+UguPaudKQ6+MF84UC1dJ8FdsRZ9vhlGfi9Ilg9KlfUwNSD+9MlS9oJDj9QSMy6lU
+dv41UJPZvg==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.crt b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.crt
new file mode 100644
index 0000000..9268b38
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.crt
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDjTCCAnWgAwIBAgIUOfy6Qa7zTjHWOwUTdNZrzoPVVjIwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAzMDIx
+MjE4MTRaGA8yMTIyMDIwNjEyMTgxNFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCC
+ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANj0NXPAatJmpQZSqdoNU/aI
+bs+b7WXHJ7if2pRU+sdbAdIqU9+eGK+9lzkyin8YExvZokDXV2cHVQCW2kO8qnZE
+pksKk9ZtHMe8RoksbRN2uNq174GXLlku52+IleJE6SiuHDWZXU0s0tZWl3LL5yei
+oKaWLrF4bKxdZqm2bnWe3VZJVgiIJOuHMMoSQPE8BYnUe6n0YjpZR7vHifBLf0wM
+w70PuZrzcADRiokqVFS5dwZYhW1xErbNA0/pYg3MsnQeNuWyJ517KkYSUxCAnb8q
+LHWGBgcqJ7CSmGGpcLDIDEXJZg4lT/SQp08n99+EoTy7rkncoG+pG5IBmyJKLtUC
+AwEAAaNnMGUwHwYDVR0jBBgwFoAUW9xzC/kK5/CH7SQsbS+ZAIXolsswCQYDVR0T
+BAIwADALBgNVHQ8EBAMCBPAwKgYDVR0RBCMwIYIJbG9jYWxob3N0ghRyZW1vdGUt
+ZnVuY3Rpb24taG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAVo2WzynblomqTGqUss9S
+eCOTrKkySETvGLK/GLLKaxn1Du/JRgEpIXCo5Ri3MQ8MBtpi748kN3z6Jh7ouoeu
+tG1I/hll0naxUzBDBzx25ZxSxGCQG/eoOFgRLF5OQ9L5BKRMW1T4/XNJqLtUVoiw
+qiulkNYJs2tKD/swAkM3LwAGuXl/p9KbGoYmxozwDYrW9PohL2zS0qzBRriJQGhM
+x65SeX50KhiDXx6mXTVUfFU91lh5H6er8SOHbS103jt0B7Y3403mhDmq2NcmY2Ln
+P3d6AZlhXwzsFDxD45U4lxYxMWTy6rOZkcCqrV9LGHYbmZYQPVgmiseWWo6qjLiI
+pw==
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.key.p8 b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.key.p8
new file mode 100644
index 0000000..00ab834
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/a_server.key.p8
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE4zAcBgoqhkiG9w0BDAEBMA4ECMk3nj4DFVXtAgIIAASCBMFgWecNkVuMByRI
+P3qAOsQxC+Tmf1eamSe4jn+WS022jUTaqO8c+T/9yyj+UJE9w1VKb1+RYAcnijAY
+JJghl82TTIZgzQlIcD1MwiV2E6Fiz/HolaCeEIOHhNEZDdI5INke92uon++TG/ue
+aW2dhpy7AFL138BUr1GfeYZgnpRWh+VuR9tgR4CRbINFqohD0nein5XUMHUZr+cY
+pFho0f6P5+AGhkNJJK7b/NMQULUimehnFU+wiKavEkCPcLQ3ZOd6WwadOLJMRpiK
+jrqP5KB6YoB/W83L9qMscZnA6gaLnfBwP42EQOtKVaPVLF3p0vb19SjQ30XNJgwG
+O5snRRuFQJ+IeWDklZMlEGcDrnZrRh6CtOtszBPxwmX2TNjh7XxP3KRUDj2a1dg0
+WHLp9osJuBkHmCa5Yb7EnX/eMWfrGRnJ8h3gIPP237rgyGkkdgC2uh4KSzZ1m0f7
+hUow3sArKzg3zdMbNIaV7IP/9GjJbSIHVSta7LinWgygHUtgApjKsGn4AMdZsTRO
+NE5cN7M4S7ersvufRF2/1EH485A5HPP4UpD4xtq6v6dEX6hcN+mBoKCwVoeClxSC
+avvWbI6z1LWg+m3vwNywFWJ07tDV+iLQdBk197xeNZ687qjgaxHEG9hgP2neJ7Jg
+lhAtyW0SbgJG/Ux5D7/wc2beQ8VhgsNUuX5XDZd09RsgGtSh28qUb2lhP6jCHH6v
+FDRkWtZDmQDwvofyY9cRWvRxph2GE8P+/KVV36xs1c/SJmaPevM0rI8aRP1ocLXo
+yx+eeQqPPg7gxSx0v9yLQ6j6pj1yhq3CPOnG2vp8MKSjc3U1sTNYGn1mc6Et5bbG
+cLiTIiJv566g2je2xub74Ylcj0dZdispp20M3AYHOLNsO/JNY5rKJgfiuJNbhbNV
+VLMIcB5yr8hhZIE+AoE9FWxFGLhoPwmGoQbvXqlvj8YvL7uqRduMxxwYSULTnINn
+ghG9Gt8E5ha6FSk+neOd2upj9kJng+RT2xlKLTdeaQ4T/w9Uq5Av9tqD9Q8xuUFQ
+Dh1weFH8HjgiJD67ozqZa0u2AbE60QXbTj8ygSOADTEdTNox97NeDRjb9hdyFCbn
+7T5ISYB3ZnughKN1CdwNAz+OEnID0ydidcDvH5DHMkh2Cey4ghMd1LdHH61cswcB
+awWjBCC8MB6tEPeWO49q0Jdq2nXeCWIBvi+0Bsde4BY0yk+S283pu78r6/NZC9y+
+fyNemrFZKmnBIwLQ5TLAxiK6fSo+fZPXRESGzsCCezLp5y1WoMmkEQpyYbw/XWZs
+9WmbldJuvNW2YHfmZ3n8ijtpmyzUlzZrXIPLB9TVqNGINj/bgSG4OjyMT0jmSQ55
+uGOceaG02fQjl1culVIe+axyox0UFQP3g/B5bnCDbOv9J8Nohw3OZo8Me6ThnwsB
+I55krAmytF6vr9FtlOqv+eF9Ahf/RRnr4IrklkgoZpcK21itk5SLPoW/hhfZnvgg
+Lraeycf28uaVknxVZ9gEVGcxa2Ldh9sF4u7+lhTZ/ax22X9QN0fl76pIY8zoRYSL
+o2maqbjH9q8bkOpX7MzXJ349k4h3QrduF+7bC//zoUTLo/rV/7b+LX2s2nWLvBhy
+T4+PpNAkuA==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.key b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.key
new file mode 100644
index 0000000..5296502
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.key
@@ -0,0 +1,30 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,F727DE48E684F910
+
+vY/mZIbpshq5XnwaMckh7bwNJBi73Tzqklbds3BImfmE6ukDQTNtDso7dt/TGxr+
+8oDIT2WoNKpqw16t1d4NA9zcaA3FqJNqiTYng/5ZlaQlAUwHg5UMbBcJJaYT5ul4
+5rxa4huKKnWK7wxHl+w68bjlQLeo+gPpcxHNVfcqcXRvr9QXeNMipQVyFfk7Z9zV
+uOZ4cjduWTHSvEWa5TrZHEXK4pqi/hWjo0KSth6JkIXxyVDdoNTMof3aY9ZRcIJU
+tkRKrx6LEVdDYPlJiVssp1xpfVM8CY9K/44niDYlwxAChcqoBMm4dL0zYRj+qhSZ
+xhuwtSHr7aSK91kB2QDxGBbK9NAOTdyt3+WRjjJRB7Y2KLS8pTneg+HWtpS/cMIR
+ghVFVblksIiUxEYBRHll1LPyG32iW73pCbw1LD1gxdtHnYEAIAeh50YuQKslqrnO
+tkILd7EKPQU8cx/fZ2BVzfH4flXzz8WmLBXzI3c3h0JsZRK7dsWD2pEeVIry5ZRp
+Y4DIOxGbCbXR7WOno0jb/qK8CAiiv7Ms0sZ9pJTu3ep+mMm6LHDh2M6XLJdwvQKG
+jj4JK828sOzFA71NMqvRwEYZJJX7Aeq8qEjzJGicJvGXc2fY0Fcf8wSm4SflhPau
+TdzumS/Bonp1pIGCwNpstyrn5NtEC3o35VmxaXsy3fD6Pdw/KAZPjx/37Yh41v72
+tQKiIQ0yjBE1Bl9+cZrfdbrA6Um8N2eTq7eDl6SwsRN1BBMGUfsAIL61n8Qsxnip
+D3hRvLNkDYE0AOMKWws8bxjOK0YeV4al5kSz4hQPUWH8NCqfhY8nW6Auq2ydI/KA
+nnfUHifMUw6ioqueQbolVU8+U5d8HSSmTVyj/jJDb1ogn3WPMaaIyWUYgIWlfWI2
+XlJM0y4F/BmWRh+cPoo0FNDC9XtUDV6MyhDUndPQtnzVFINHU0Bnt6H+PbP78qPu
+AMJy2+sfHpyF8AArmWgH1y2lDfA01J1ut2q/t/Jx8/IqA1V9aLBdd8RT/8n3OczB
+IqWT1CFjPrm8RVOgjt8skNCaSDZ4NlgN3ES9CLEJ+qRlRjxiAvBQuusIJWMSkOXR
+p6gT63uVzqQZ2GH4s34BFGLaGe5Atf/Eq3k6ldpCmqan52MKlFnMTuJ93GbF/N1d
+u05VTA4aofcCXyU/giMDfTYHN+rf6VTDtxhCso9lNakGDE3MdaoPToNvquttxXRd
+NwcBgiOZf4DbJmL+W1UxXALwjqdDan+wpWzDKWM/pOLORQfLxdBrWHlpqsuGZqbP
+Blj8IUt9HzhRNpXs21zmHdGeA4Z3s5LOQfrHJzbMNOaTO211xbgOkVYE97Lvr6D/
+orp4FS6nJB0qvwFAvLzv5WoC1cwwl5RqYYjToJh1/UDHAPUq6XLVl9d3/dJyExeW
+0UbM86TWjenIO3/I20W+MiPjLwoC7pMgzZpE136fGRm7aopm4K5ATLoV83b988yH
+3wIybFOvwHZ3XsZJg4Av8i6+xcs77KyC1j5wPl17XkufJNrBSCsijYKVuQYC0q+o
+zlusJ3DgquoF3OqxN8s7w6FhlYp1IUdXOt6Nfv7ujxkYFQIdOCijnQ==
+-----END RSA PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.pem b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.pem
new file mode 100644
index 0000000..1ce0cdc
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUAjTsvkODIdEaOeWtEzseL0HXFukwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYjAgFw0yMjAyMjIx
+MzA2MTNaGA8yMTIyMDEyOTEzMDYxM1owUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAL96ea/D
+fGRNyInRqHoD4HpEAZRUAUZxyqi5YTBCJkoMpDJznk+aFQqHAbSJKiz6/G/HYF0A
+0fohDxONIFtV8cxtR1hPhlmo4QxRdo2H+3aJcbHvxLmLbZ7N1DhVDV+cAcUxHXSu
+uJbREKALsGWO7vIFI9GPcK6xEtfOMtBZwGJ1R/quoEjl+a7wOiZqTExKkJUaCwQS
+wUst3sfvyRsogrLPLH/M+K+u7jLaWHonkyMO7dHbWTFiGxix5EErg+LhCYt0QEgR
+ZOU7UVTirFo2W+2zLRXe/D0IbtJz3ZmS4DYVzT5hsw1l4E63BoAzMGQ9mjkTX/Mw
+gfhtMg5NjuaWN2kCAwEAAaNTMFEwHQYDVR0OBBYEFHsygq4YUtQhLlIbWc3SAbnR
+2zVjMB8GA1UdIwQYMBaAFHsygq4YUtQhLlIbWc3SAbnR2zVjMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAC9d29dhcW6MD1lyqeyvauDVgocP4tSX
+UKujrEhYi62oZJ31oGvXfyzW3XkiVTaA07LWyaPOcZdfJkAaYW6WEdPEo4jJV7kk
+2aWwkBBXPIU9r0kaxscr93uRoJ5WdfXtixKOrpmpxsKsjIbmgfyKripHYSU6dAi8
+tn9yo8lUfy9FBxF6ZY5QVmukiqjMOHw+2Xpfu5tcrjqpauAh5Z1a0iQhmW+u8ar/
+Z6UJ2lwyFDbzinZQToM1HBpmmy3SjaUGuqjDBKJJNyQSUv8YuzLOFdLsT1cNfMnP
++hPZYtYBMPavUeGwwGwA/l5ZQBF7jZ8iI7WCdHhnfcWpDNzRm4bvVUg=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/b_caCerts.pem b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_caCerts.pem
new file mode 100644
index 0000000..2415a2d
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_caCerts.pem
@@ -0,0 +1,42 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUAjTsvkODIdEaOeWtEzseL0HXFukwDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYjAgFw0yMjAyMjIx
+MzA2MTNaGA8yMTIyMDEyOTEzMDYxM1owUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAL96ea/D
+fGRNyInRqHoD4HpEAZRUAUZxyqi5YTBCJkoMpDJznk+aFQqHAbSJKiz6/G/HYF0A
+0fohDxONIFtV8cxtR1hPhlmo4QxRdo2H+3aJcbHvxLmLbZ7N1DhVDV+cAcUxHXSu
+uJbREKALsGWO7vIFI9GPcK6xEtfOMtBZwGJ1R/quoEjl+a7wOiZqTExKkJUaCwQS
+wUst3sfvyRsogrLPLH/M+K+u7jLaWHonkyMO7dHbWTFiGxix5EErg+LhCYt0QEgR
+ZOU7UVTirFo2W+2zLRXe/D0IbtJz3ZmS4DYVzT5hsw1l4E63BoAzMGQ9mjkTX/Mw
+gfhtMg5NjuaWN2kCAwEAAaNTMFEwHQYDVR0OBBYEFHsygq4YUtQhLlIbWc3SAbnR
+2zVjMB8GA1UdIwQYMBaAFHsygq4YUtQhLlIbWc3SAbnR2zVjMA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAC9d29dhcW6MD1lyqeyvauDVgocP4tSX
+UKujrEhYi62oZJ31oGvXfyzW3XkiVTaA07LWyaPOcZdfJkAaYW6WEdPEo4jJV7kk
+2aWwkBBXPIU9r0kaxscr93uRoJ5WdfXtixKOrpmpxsKsjIbmgfyKripHYSU6dAi8
+tn9yo8lUfy9FBxF6ZY5QVmukiqjMOHw+2Xpfu5tcrjqpauAh5Z1a0iQhmW+u8ar/
+Z6UJ2lwyFDbzinZQToM1HBpmmy3SjaUGuqjDBKJJNyQSUv8YuzLOFdLsT1cNfMnP
++hPZYtYBMPavUeGwwGwA/l5ZQBF7jZ8iI7WCdHhnfcWpDNzRm4bvVUg=
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUdVEDHKaXNfdh0Jgf/7x1xhEOP/owDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYzAgFw0yMjAyMjIx
+MzA2MjNaGA8yMTIyMDEyOTEzMDYyM1owUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALDfn3wo
+XxxKpIter4QsGCYI+AiFxVouJXJlswa6VnSQtFYa6I0gsqVOmCgaWNpYKTKzYRgC
+/neNwpirZLj6A8Ubq37T0cr0D3Et0axWgVkySjVEDZOT4CP5FiSVGiwIN2MexZRH
+EO4njk75m/NGXY6DMiq6qrpStjirQlET2Yd71BqLpDXnfKwN+SOUGfTjwCNkx8sQ
+Vx5ibZtk1K/XihA7FCECTQqbzSsrKv9tDEayuW61KJZTcPCrwloTlQztySHbtiPD
+iojTE8FQAu2PmpDPt0nCSsSAM1EKD6iXyXzj7zzIsKZOZoj8E2d7DVELFYOTenKU
+/dYaVuCDDPBxalsCAwEAAaNTMFEwHQYDVR0OBBYEFLU73nUEzinqIo7fdWjQd2pa
+plF6MB8GA1UdIwQYMBaAFLU73nUEzinqIo7fdWjQd2paplF6MA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACvFU80K7nkSSDxCJZDBdgreO0K/seDH
+tFw5OjCUyfTw706Pm7wUYFwQOkt/agWWYhpnxws0bZaMD7PgWDTpURWLeowxE0Th
+DImuj2KhnFWHjaziM7/aN2/NrlFzdkHGvd2TJM41jWpTmjxYb4VKv1iGu4ds0wA4
+e9IcLHD0wFc5rG7AsXWvPEQAdxvPJbM4tmYm63FaefSNXLrnLUN8Bq2IEA3/Y3gi
+gYmfblMSdT9o8WGGj95EEIrDpJEgAt/C/jO5o74O8s6xkBDBR+13JouaLyVNhXgs
+xAhHNnK5uetPUWGhK0na6WN/Lzn6153lDo0q7xo99+f6nZjNVEpE4TI=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.crt b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.crt
new file mode 100644
index 0000000..65004b2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDHzCCAgcCFBoLcsOFD9a6veyW0+4+gG8k1idPMA0GCSqGSIb3DQEBCwUAMFEx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxCjAIBgNVBAMMAWIwIBcNMjIwMzAyMTIxODA1
+WhgPMjEyMjAyMDYxMjE4MDVaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l
+LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCm+mtGUrYPzqCiS+0iCir3yXG6KsYv
+/UMky1fU7KxmjksUEzs3GfZgi5as4mbETy2jliArXrygCRFqL3cH860IJxb0uyRk
+VBsVL5lRy8jgUfEaZ52nDlrVtwZNnnchZUMqnxM5DIEDKFj4Jzqo4kp1Ph9vIHBZ
+a8PkMOtENPz+5RPgrUreV/qH5+4A0zHiI6l88IHLFXf9udwawgHWo6YZaiOPOazb
+lGXBoWZ/nQmvYE9skO27SlwIP7rmSBA6fM4ejxcXAsWwcucAYDful6YegAEQrA6Q
+8M6Z5P6mEROPOlEkF7qNFJDdlZiDYfmBXDgyI32knLR1jMpUqAdt6t2bAgMBAAEw
+DQYJKoZIhvcNAQELBQADggEBAE3QHSHWo5AQfLzP2pWCSQvfTO2qsoizpuVKG/bJ
+XNHutI6iK2c/juo/GONfmiw1G3+OZ+EwjoU4+SVz6IvCxfeIrr4q2+YK746mOYE/
+xJfu058BMkrMeZfXXULuAHfuU9iQjEZiFrRGA5xyaSdYTsAPhtRVuiUnLKccSg37
+cGNGS8qTV+AIbsFDrysDhyQRvV2cjWaFhFRxOWJaOixtNZr5GBNK4NZSl9CPyFNj
+kzyX+1aBSpB9GQu8elh2kOWEigRtmnoBAGEvLh/wNR2IcRQR6uCxVktQXEx/uKDC
+OgK26QUKZ0JYBi5R1V8GHOdZfPBMbx1VdVkKtA/uQsbX91E=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.key.p8 b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.key.p8
new file mode 100644
index 0000000..0b7a968
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/b_client.key.p8
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE4jAcBgoqhkiG9w0BDAEBMA4ECMLse3faQOVSAgIIAASCBMBKFvfRapdgY4BM
+J8KyrDVmujJ+EsyFZSqz50ZVUNDGGqUPqDoTO06HsQs4UtdD84u/p4LfiwHw9IAO
+cMxNACp67jaa5zn+O8Hh4OMW+aTIv7McBx5VAJ9JD+7vmt80U6s0hs1XShwL3f83
+jNnRpS33jqQc31IuKQKA9GcaTQiG8vRvHGSYVMSnT4eIaEMJUOlGwkJgq+5MbaVk
+cRsPEfIFQyqKuzq6obtvP3BeCbUNYJm9Pw0ztdlxFx93lKJDuOnibAUyLkHuTBrB
+V2PMg9w7zbV1i68GDG16H9/u3yP4yq3kvB1D2dd4p8EWxKPetUZofxJYyxAFE30P
+cUCvHMmcJkLJPLm8wE+1hyE5I8B8Xv88HpVnI/A7+M3zVbFLbsuij2FxNju7J+Kp
+mv4kJoNPnmcmxfPWhCtEqJFpIbUMq/qsOa/X99c1kMEAYycJ9b1a0bIJNQbDw5sP
+46nkkLLqM4VpRK234ZEd5ATjfIA7YkWwNZGCHGQCpBmbz6WsD4O7wSVPmi+PXdJc
+WT1uQOuaQsFhQl1OS5YSbh9Dzuze7VurTTf/vbS1HXmAR6sTwX/xaXhmohjbdake
+SsUVGfjCJUAu8BCYD/iIB9LZ2FjcBPLK/pB4ODNH1Urhrh6QishxF698xs15mwYH
+BGfCMohYhm6Ieo++nbTCrJlEb9XBrsHo2ButB8abg4ZB9Uh0vkYwpl7eR1jLmHqn
+/nC3sa7HWVOIQalEbWQKihUCByVnPXj5SKs1L9ofBZHEjXj3XBUvyDuZ6XDf0OEI
+OAVhe6L4k+6ZRxDjJqBmjslwTiFvGPzxpS9f2XbuQr7SVP/j4g1kJal2ZtFtquqe
+7CPx/XmAKWqPWVr6H76NtdLKnzsvSrGWSh724eUW22nfwi3NijfabC4f2dGylpRz
+Fz5kEvOy7jeDxUDre3m9M/LvgIhZXPAc7V67VgULFOmKx5Jvi1bh4csmAs+mnAgQ
+MNEXT8XqMFaQALBVT+kpD65TLy8tbjRsi9r5j/pgtFqshUdEi8NAOqJLSgs5L3IT
+Py5ga0jK2awcQUmnUPFFQ+w4r18ZypMQYidlED6ZtzlrAvX+u6w3kjeshs192L+B
+Jahd8O1LEhPreRKvjP8Pjhrknu2TbOXRLBcVV1dmRA/vF7YOdqfzwmA01gqCC9nJ
+6RGb3cDPdK++Dmbmu7US2OURDCXsrg/2wH5cNMqSuAHpcZD+/TYokEaLj0uK+wrJ
+kgfaWXXC287fRVAlnTGKzklcjzkABYpEpaR7HCf3AieZz5OF9GaUdhDSklFyc+6f
+AG9ZwFsbnL07yuZsOwlKQk9G6MBRGZNJmRfm/WCULS2j1iBOTVrC2mfmuBjHJ8g0
+UzML2c5e5FZ1fTDuTf5NUJafN+x2pw9ENfrFSUZT9O5cTTpNsdKrLzEbJPA/FOnY
+Ap5HyQdd8uLvrbz0A891jnRJzbNxhQuPuj6xgH1bPVgoFv0DFQywNlK0cbMPahlX
+zp+De9mh1fRnbjbShu9ENbdFaBW3biup0turOGlZQhgjGBr6dmgCE0ROb9xWillg
+reA2VO5vHuXKzDSLXlD/aeizMNnjv0hVSDuminnqDsdbm1jb8/87SaHKJlLfxg5Y
+2zYvxC0b
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.key b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.key
new file mode 100644
index 0000000..1af0d31
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.key
@@ -0,0 +1,30 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,CC39A8B6BA1E34FF
+
+w/ZRutsVkmJ4Yr867GxOqbwLqiDnomNwd2Z+X58xM6rzG1KEpGgxKeL+wKjki97k
+rhhfVtie8o4KZfKg+06zamhuzw1pyeyPifSSPZ9JmduiEfZI9NiDUPNSPtMuz+sN
+dif3Mm2OriMII2iPoeydjyPHbd06rHplLi2Q4fgECsbN1WyxK7oRCA0Y9+oAov7V
+AoeE+lIRReE6swuRMsUPFGZKNsYM3L/OCQEiBq7ZovOFHaE6SmnYQMG5ibEGleyz
+LDzl8ixVS1eoEFa6CBJw8fgFpgucsS3viO4kxAO1GMzA+siqQSgVLh5wSoRv1Xrg
+C5Snyvi1aHZvcAZo5sOhaqkGpzqb7iSKkvYTPLwv4WsKmn5/H6Ut433MvlPM0HVW
+NtAPhQxH9RzrLQoLQq943+sfJPkMBcJsB9kI7LGWy9+inNQZ9lyf2jfNnZ6qj86Y
+VniJJxBcyZ+QabpCF9N6PZtG9IEdboVGjOuBB3eBFxGGaK30MlOpOhsgKdEWjdoc
+J2pvKs2Yv+zV7E8ka2LLBelP21fjsGx0YkxL/D8ibj/BBAzUkLeB9DJcdyr4k+lI
+kcDQRQ66LY4sAjd4/1libPZ6FMkSinrc2onj1xDREtQ8vbL8t80UgupzE72W67Ah
+JVWyfPgzdzQsdTsKM1HVPQZF2bxBKJq3z6EayukKyvSvJBVIxhqCbnPo6x8g9WV4
+rltfGQ3iMVXormwNztQPnPd0GjhKCjL6Qa9J1LcoK2cs4AIhevNwlXHmSEgBh5vx
+e+Z4GMxh8k0oDaD0RGnXnPBddmkHTkGaTZS/TaBshArE56YD2xcZQFgBqRgir+uO
+vOuhePjrFhjh//X6KNbarTbUgIMpoMHvgstZMZIWx93Dd11rLYHVQRZT9sVb0x4O
+mAmxu7qvJIXgiDT9PwuifAUEgo50r1ptu7KGbjxd09yEsC+WX/5AlX3XuAOyxxCr
+F9KPC4ne+AKqPtRUHMsTZBai2vumVTCoR8TLzY6KVB8AMQJ8thHkuxqo+Mtg5H7o
+K5O2TS3QqxRJF4sQq5p7iLcRkSyP94JTlbq6CI+m//y5NpRES2/zGFjbCnI3A1nJ
+w3g5pzHUjn2obkZ4wHmWhJ2mHXH85p8HASKWFGLB8iquDvzgrnHsLxIRiGAAjvMf
+QYCq159Dma2xzBbfIZNU+ZgrpVtbC0sRgNzjSK1kiXKrvnMj9GYsHqIbZkgDisaI
+j5pyBTn2AKS2cZLe8zSJs1hrRdPvy6GRBhizbhbmuRPMHj7XI5YgtBPoONJgKX+O
+79zrDjUJuJxegvMPMMtOVWkOJVAWCOuTncedg70rTphKs6oGjY1EC42Ln7ZUZrGs
+pyi4h6FrzzqfaKjyZDs/pukY7dIfimsYQPhh1uClBbXULh/lMrL24hIv0ggGNsee
+aIHBAxYzhlD08E9sLQwWDHNtWnNrV/eW+tkKk3woExXhdH1CDMSWBscPxC4zom7s
+cDLKJeXzvuPOE+5VLy40TtWBfvb2fIpWRuML0iBk+q642jMNAss+CAtFiNQEjq8Y
+hQbglegeI3T+X/AcMUVBxppGfFk0NpkXuKenG1o+DbCisAzqMz95tw==
+-----END RSA PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.pem b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.pem
new file mode 100644
index 0000000..410f92f
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDhTCCAm2gAwIBAgIUdVEDHKaXNfdh0Jgf/7x1xhEOP/owDQYJKoZIhvcNAQEL
+BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYzAgFw0yMjAyMjIx
+MzA2MjNaGA8yMTIyMDEyOTEzMDYyM1owUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
+ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
+MAgGA1UEAwwBYzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALDfn3wo
+XxxKpIter4QsGCYI+AiFxVouJXJlswa6VnSQtFYa6I0gsqVOmCgaWNpYKTKzYRgC
+/neNwpirZLj6A8Ubq37T0cr0D3Et0axWgVkySjVEDZOT4CP5FiSVGiwIN2MexZRH
+EO4njk75m/NGXY6DMiq6qrpStjirQlET2Yd71BqLpDXnfKwN+SOUGfTjwCNkx8sQ
+Vx5ibZtk1K/XihA7FCECTQqbzSsrKv9tDEayuW61KJZTcPCrwloTlQztySHbtiPD
+iojTE8FQAu2PmpDPt0nCSsSAM1EKD6iXyXzj7zzIsKZOZoj8E2d7DVELFYOTenKU
+/dYaVuCDDPBxalsCAwEAAaNTMFEwHQYDVR0OBBYEFLU73nUEzinqIo7fdWjQd2pa
+plF6MB8GA1UdIwQYMBaAFLU73nUEzinqIo7fdWjQd2paplF6MA8GA1UdEwEB/wQF
+MAMBAf8wDQYJKoZIhvcNAQELBQADggEBACvFU80K7nkSSDxCJZDBdgreO0K/seDH
+tFw5OjCUyfTw706Pm7wUYFwQOkt/agWWYhpnxws0bZaMD7PgWDTpURWLeowxE0Th
+DImuj2KhnFWHjaziM7/aN2/NrlFzdkHGvd2TJM41jWpTmjxYb4VKv1iGu4ds0wA4
+e9IcLHD0wFc5rG7AsXWvPEQAdxvPJbM4tmYm63FaefSNXLrnLUN8Bq2IEA3/Y3gi
+gYmfblMSdT9o8WGGj95EEIrDpJEgAt/C/jO5o74O8s6xkBDBR+13JouaLyVNhXgs
+xAhHNnK5uetPUWGhK0na6WN/Lzn6153lDo0q7xo99+f6nZjNVEpE4TI=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.crt b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.crt
new file mode 100644
index 0000000..147c18f
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDHzCCAgcCFHypTu3QPE82c0mjOBSnVtE2QKiXMA0GCSqGSIb3DQEBCwUAMFEx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxCjAIBgNVBAMMAWMwIBcNMjIwMzAyMTIxODA5
+WhgPMjEyMjAyMDYxMjE4MDlaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l
+LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVTtCv5iUiKDXKQ6cUxPQyLEMFNqrj
+VKSSRD2kNXt2Zn+M1S7156pdISoA7GKo0va4eStkvtSYhn1vxLOmGuXR/g8lIeoq
+4VCj0Vc62NoTYBaAG/gdkRqspo7wQNMxcG9bFM+GFFvPGVn47GfH+AyMhSAAiZB2
+tTcviMMW6Csv3LY+0nFwvdjoADEgXu5Z+GA/qDDS3+URS8pj7Yu+KvNCVtJh+7Yx
+QQgHsG9iREZJUKGYv0ODrMMQ4Vzik1W4sVbtWZS6eBcWzxsgaOOLUi1oTGLLir9E
+I5FO/gK2SCIqHLWMGOes0kGfH0KU8oGTLXPBfJG2PEGdP2ovMiqbah27AgMBAAEw
+DQYJKoZIhvcNAQELBQADggEBACpWzlft/FD9G17rvQcurr+UerDaQUv4HDfm8pZM
+czbT3WvNrWbQK3rO2bKM3YVpM/CX0EeWNKlumMX1uTvxwE037X07eHecdcUss9Tg
+83YR2DPUKXvhyvc5UyJR4gOvXrFnV0mBYKe2goGDHyVS7u+ZNR9Zmd/EWnuTbhZv
+KwynT/fz7Osb584SWqUbw4fNcknPeuxdxq6IwcPJKCpmioxQhJBSCYsNin2fibG6
+B16B5z47172RBrrM6d1aPaxLfjEEl9Cis0hn2VnlWxvwGYnvhCmigNlN9sORgP68
+S1CpFV4G9Hchs76Gnyl8/Q9W8eB8t31ia6Hy+L0nDhOhAk0=
+-----END CERTIFICATE-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.key.p8 b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.key.p8
new file mode 100644
index 0000000..43a930f
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/c_client.key.p8
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVTtCv5iUiKDXK
+Q6cUxPQyLEMFNqrjVKSSRD2kNXt2Zn+M1S7156pdISoA7GKo0va4eStkvtSYhn1v
+xLOmGuXR/g8lIeoq4VCj0Vc62NoTYBaAG/gdkRqspo7wQNMxcG9bFM+GFFvPGVn4
+7GfH+AyMhSAAiZB2tTcviMMW6Csv3LY+0nFwvdjoADEgXu5Z+GA/qDDS3+URS8pj
+7Yu+KvNCVtJh+7YxQQgHsG9iREZJUKGYv0ODrMMQ4Vzik1W4sVbtWZS6eBcWzxsg
+aOOLUi1oTGLLir9EI5FO/gK2SCIqHLWMGOes0kGfH0KU8oGTLXPBfJG2PEGdP2ov
+Miqbah27AgMBAAECggEBALlRHPsEvpQbTQgpmHnx1pnUXHGmGfcgrU2p3cfQ8y9d
+Q0kuWPsoxpjcXCqrM9eqWNDR7II9TGQqy8WTpeQUkD/NMok3GsMgNzfrgpZMmcGl
+hEbOhdPiIcOHfL18BD96iDQgM18wRsb5JnKdqFhRx005xyDQYuLO81/nuF4pXIPZ
+7V/jUYM2+0SYUvvKl9ik3ms7f4K3gIByFB6P2XltBbzMGrspGe/zrW7ExpvhSAWH
+Rs6kLwNH8Kjslja88EOSN7YxtL86pGxad1DvZK6NZ4IzFPujuRFixjMYiDJCjuG9
+fxaZSGPIKftfv1+Us4y70vVkk683hNtpqtObsyZ1vpkCgYEA8t8lOLijoP1IQTzK
+r/vZxGJO6GTsAcuLYIWKmrFwtT1dU0SCJhOihirCVcRKLPBssHyP+Oy2vkQ3ZqrD
+EWMUdPSUAaWSGxJTxooPUX73Q2Ji31f48EfEfluUB4BhcWb4Cb6eokOxkENyrTFh
+yPCYCIoogS4aRbK3sjoVhbkmIPUCgYEA4NaRBQT8NEJFMi0H5qQqx58F2SAK4oWX
+LACstL/+QEJks5WmZOOWppOzFxwZq/gV/xpGHZlBGLWQ2roQLizEZyt1xG19XS4r
+xrQX8hrzDBaLoc54c4hdnn7TBSg2GOEfR+tQDm9zjL9lg2cUvO1QM4QpV2Nl3OES
+gZH6NExgVe8CgYEA0lZ7b/JIlAaHayhxCZR+dtwFJrMwyiUz4jYFYg/GYtBwYMr2
+RG/A0514jLrg2qamGvrl3Vl0srd1m47MPeSnNNsUy5BPRu7KaynqNlRVZKByrKcN
+y1r8S8qYGx8ORR4+NoJF9JXX0+72BBS+cT4dDVhJ2AbLbg1nlg1D0noVEXkCgYB/
+VYV0Dkq3XgDKVe5vlPIVc95gBWGkwHvJE+i7xTA9PFzDyDEgWjRryqJWqDdxyWLL
+wxmKkczoLYwIE/At9nSU6Wwc/tVM/GUWd9BH8TNM2UfjL4fQBp7esHzaItfgZz7M
+AFyqkDwkrhDTxoFcLhqJ0oOHVmfOHDna382P6OF2SQKBgEmOyOZsRXymHsralgDJ
+zXgJDbj+waJP/x6rREI4OTJXgI4djhVYjuEJmVvDM4tu29ae2fQr11Uakbf3s75S
+IeEGf1Z5s0slz4HjOQOKIup/U1AtQGkcjGlGgLHpy1aW20Ev72/dOPYPGYSW9aZ9
+U6TNgR8CLFxC0u73wpQLNssk
+-----END PRIVATE KEY-----
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/key_password.txt b/statefun-flink/statefun-flink-core/src/test/resources/certs/key_password.txt
new file mode 100644
index 0000000..30d74d2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/key_password.txt
@@ -0,0 +1 @@
+test
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/certs/server.ext b/statefun-flink/statefun-flink-core/src/test/resources/certs/server.ext
new file mode 100644
index 0000000..54c4783
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/certs/server.ext
@@ -0,0 +1,8 @@
+authorityKeyIdentifier=keyid,issuer
+basicConstraints=CA:FALSE
+keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
+subjectAltName = @alt_names
+
+[alt_names]
+DNS.1 = localhost
+DNS.2 = remote-function-host
\ No newline at end of file