[FLINK-23296] [core] Introduce RequestReplyClientFactory extension

* Rename default HTTP request-reply client
* Implement default HTTP client factory
* Wire in RequestReplyClientFactory into HttpFunctionEndpointProvider
* Pass in ExtensionResolver when binding JsonEntities
* Bump YAML format version
* Implement v3.1 YAML format with transport spec parsing
* Implement context-safe decorator for RequestReplyClient
* Ensure correct classloader when using tranport clients and factories
* Move Classloader handling responsibility to factory implementations
* Add TransportClientsModule

This closes #243.
diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index a5b9d2c..f746a7f 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -23,10 +23,22 @@
 import java.util.stream.StreamSupport;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.util.TimeUtils;
 
 public final class Selectors {
 
+  public static Optional<ObjectNode> optionalObjectAt(JsonNode node, JsonPointer pointer) {
+    node = node.at(pointer);
+    if (node.isMissingNode()) {
+      return Optional.empty();
+    }
+    if (!node.isObject()) {
+      throw new WrongTypeException(pointer, "not an object");
+    }
+    return Optional.of((ObjectNode) node);
+  }
+
   public static String textAt(JsonNode node, JsonPointer pointer) {
     node = dereference(node, pointer);
     if (!node.isTextual()) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
index 9c39977..4fffffc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
@@ -24,6 +24,7 @@
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
 import org.apache.flink.statefun.flink.io.spi.SinkProvider;
@@ -33,15 +34,20 @@
 import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
 import org.apache.flink.statefun.sdk.IngressType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.TypeName;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.statefun.sdk.io.EgressSpec;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressSpec;
 import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
 public final class StatefulFunctionsUniverse
-    implements StatefulFunctionModule.Binder, FlinkIoModule.Binder {
+    implements StatefulFunctionModule.Binder,
+        FlinkIoModule.Binder,
+        ExtensionModule.Binder,
+        ExtensionResolver {
 
   private final Map<IngressIdentifier<?>, IngressSpec<?>> ingress = new HashMap<>();
   private final Map<EgressIdentifier<?>, EgressSpec<?>> egress = new HashMap<>();
@@ -51,6 +57,7 @@
   private final Map<String, StatefulFunctionProvider> namespaceFunctionProviders = new HashMap<>();
   private final Map<IngressType, SourceProvider> sources = new HashMap<>();
   private final Map<EgressType, SinkProvider> sinks = new HashMap<>();
+  private final Map<TypeName, Object> extensions = new HashMap<>();
 
   private final StaticallyRegisteredTypes types;
   private final MessageFactoryKey messageFactoryKey;
@@ -112,6 +119,30 @@
     putAndThrowIfPresent(sinks, type, provider);
   }
 
+  @Override
+  public <T> void bindExtension(TypeName typeName, T extension) {
+    putAndThrowIfPresent(extensions, typeName, extension);
+  }
+
+  @Override
+  public <T> T resolveExtension(TypeName typeName, Class<T> extensionClass) {
+    final Object rawTypedExtension = extensions.get(typeName);
+    if (rawTypedExtension == null) {
+      throw new IllegalStateException("An extension with type " + typeName + " does not exist.");
+    }
+
+    if (rawTypedExtension.getClass().isAssignableFrom(extensionClass)) {
+      throw new IllegalStateException(
+          "Unexpected class for extension "
+              + typeName
+              + "; expected "
+              + extensionClass
+              + ", but was "
+              + rawTypedExtension.getClass());
+    }
+    return extensionClass.cast(rawTypedExtension);
+  }
+
   public Map<IngressIdentifier<?>, IngressSpec<?>> ingress() {
     return ingress;
   }
@@ -140,6 +171,10 @@
     return sinks;
   }
 
+  public Map<TypeName, Object> getExtensions() {
+    return extensions;
+  }
+
   public StaticallyRegisteredTypes types() {
     return types;
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
similarity index 93%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
index fd7e5fb..f2cf795 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
@@ -39,14 +39,14 @@
 import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
 import org.apache.flink.util.IOUtils;
 
-final class HttpRequestReplyClient implements RequestReplyClient {
+final class DefaultHttpRequestReplyClient implements RequestReplyClient {
   private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
 
   private final HttpUrl url;
   private final OkHttpClient client;
   private final BooleanSupplier isShutdown;
 
-  HttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
+  DefaultHttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
     this.url = Objects.requireNonNull(url);
     this.client = Objects.requireNonNull(client);
     this.isShutdown = Objects.requireNonNull(isShutdown);
@@ -67,7 +67,7 @@
     RetryingCallback callback =
         new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown);
     callback.attachToCall(newCall);
-    return callback.future().thenApply(HttpRequestReplyClient::parseResponse);
+    return callback.future().thenApply(DefaultHttpRequestReplyClient::parseResponse);
   }
 
   private static FromFunction parseResponse(Response response) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
new file mode 100644
index 0000000..fd7fcc2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.SetContextClassLoader;
+import org.apache.flink.statefun.flink.core.reqreply.ClassLoaderSafeRequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+
+public final class DefaultHttpRequestReplyClientFactory implements RequestReplyClientFactory {
+
+  /** Unknown fields in client properties are silently ignored. */
+  private static final ObjectMapper OBJ_MAPPER =
+      new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+  /** lazily initialized by {@link #createTransportClient} */
+  @Nullable private OkHttpClient sharedClient;
+
+  private volatile boolean shutdown;
+
+  @Override
+  public RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl) {
+    final DefaultHttpRequestReplyClient client = createClient(transportProperties, endpointUrl);
+
+    if (Thread.currentThread().getContextClassLoader() == getClass().getClassLoader()) {
+      return client;
+    } else {
+      return new ClassLoaderSafeRequestReplyClient(client);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    if (!shutdown) {
+      shutdown = true;
+      OkHttpUtils.closeSilently(sharedClient);
+    }
+  }
+
+  private DefaultHttpRequestReplyClient createClient(
+      ObjectNode transportProperties, URI endpointUrl) {
+    try (SetContextClassLoader ignored = new SetContextClassLoader(this)) {
+      if (sharedClient == null) {
+        sharedClient = OkHttpUtils.newClient();
+      }
+      final OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
+
+      final DefaultHttpRequestReplyClientSpec transportClientSpec =
+          parseTransportProperties(transportProperties);
+
+      clientBuilder.callTimeout(transportClientSpec.getTimeouts().getCallTimeout());
+      clientBuilder.connectTimeout(transportClientSpec.getTimeouts().getConnectTimeout());
+      clientBuilder.readTimeout(transportClientSpec.getTimeouts().getReadTimeout());
+      clientBuilder.writeTimeout(transportClientSpec.getTimeouts().getWriteTimeout());
+
+      HttpUrl url;
+      if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
+        UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
+
+        url =
+            new HttpUrl.Builder()
+                .scheme("http")
+                .host("unused")
+                .addPathSegment(endpoint.pathSegment)
+                .build();
+
+        configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+      } else {
+        url = HttpUrl.get(endpointUrl);
+      }
+
+      return new DefaultHttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
+    }
+  }
+
+  private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
+      ObjectNode transportClientProperties) {
+    try {
+      return OBJ_MAPPER.treeToValue(
+          transportClientProperties, DefaultHttpRequestReplyClientSpec.class);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Unable to parse transport client properties when creating client: ", e);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
new file mode 100644
index 0000000..4639fc2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.util.TimeUtils;
+
+public final class DefaultHttpRequestReplyClientSpec {
+
+  @JsonProperty("timeouts")
+  private Timeouts timeouts = new Timeouts();
+
+  @JsonSetter("timeouts")
+  public void setTimeouts(Timeouts timeouts) {
+    validateTimeouts(
+        timeouts.callTimeout, timeouts.connectTimeout, timeouts.readTimeout, timeouts.writeTimeout);
+    this.timeouts = timeouts;
+  }
+
+  public Timeouts getTimeouts() {
+    return timeouts;
+  }
+
+  private static void validateTimeouts(
+      Duration callTimeout, Duration connectTimeout, Duration readTimeout, Duration writeTimeout) {
+
+    if (connectTimeout.compareTo(callTimeout) > 0) {
+      throw new IllegalArgumentException("Connect timeout cannot be larger than request timeout.");
+    }
+
+    if (readTimeout.compareTo(callTimeout) > 0) {
+      throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
+    }
+
+    if (writeTimeout.compareTo(callTimeout) > 0) {
+      throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
+    }
+  }
+
+  public static final class Timeouts {
+
+    private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+    private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
+    private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
+    private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
+
+    private Duration callTimeout = DEFAULT_HTTP_TIMEOUT;
+    private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
+    private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
+    private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
+
+    @JsonSetter("call")
+    @JsonDeserialize(using = DurationJsonDeserialize.class)
+    public void setCallTimeout(Duration callTimeout) {
+      this.callTimeout = requireNonZeroDuration(callTimeout);
+    }
+
+    @JsonSetter("connect")
+    @JsonDeserialize(using = DurationJsonDeserialize.class)
+    public void setConnectTimeout(Duration connectTimeout) {
+      this.connectTimeout = requireNonZeroDuration(connectTimeout);
+    }
+
+    @JsonSetter("read")
+    @JsonDeserialize(using = DurationJsonDeserialize.class)
+    public void setReadTimeout(Duration readTimeout) {
+      this.readTimeout = requireNonZeroDuration(readTimeout);
+    }
+
+    @JsonSetter("write")
+    @JsonDeserialize(using = DurationJsonDeserialize.class)
+    public void setWriteTimeout(Duration writeTimeout) {
+      this.writeTimeout = requireNonZeroDuration(writeTimeout);
+    }
+
+    public Duration getCallTimeout() {
+      return callTimeout;
+    }
+
+    public Duration getConnectTimeout() {
+      return connectTimeout;
+    }
+
+    public Duration getReadTimeout() {
+      return readTimeout;
+    }
+
+    public Duration getWriteTimeout() {
+      return writeTimeout;
+    }
+
+    private static Duration requireNonZeroDuration(Duration duration) {
+      Objects.requireNonNull(duration);
+      if (duration.equals(Duration.ZERO)) {
+        throw new IllegalArgumentException("Timeout durations must be larger than 0.");
+      }
+
+      return duration;
+    }
+  }
+
+  private static final class DurationJsonDeserialize extends JsonDeserializer<Duration> {
+    @Override
+    public Duration deserialize(
+        JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+      return TimeUtils.parseDuration(jsonParser.getText());
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
index 85df004..428e474 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
@@ -17,30 +17,36 @@
  */
 package org.apache.flink.statefun.flink.core.httpfn;
 
+import static org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE;
+
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.Objects;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
+import org.apache.flink.statefun.sdk.TypeName;
 
 public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec, Serializable {
 
   private static final long serialVersionUID = 1;
 
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
-  private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
-  private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
   private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
 
+  // ============================================================
+  //  Request-Reply invocation protocol configurations
+  // ============================================================
+
   private final Target target;
   private final UrlPathTemplate urlPathTemplate;
-
-  private final Duration maxRequestDuration;
-  private final Duration connectTimeout;
-  private final Duration readTimeout;
-  private final Duration writeTimeout;
   private final int maxNumBatchRequests;
 
+  // ============================================================
+  //  HTTP transport related properties
+  // ============================================================
+
+  private final TypeName transportClientFactoryType;
+  private final ObjectNode transportClientProps;
+
   public static Builder builder(Target target, UrlPathTemplate urlPathTemplate) {
     return new Builder(target, urlPathTemplate);
   }
@@ -48,18 +54,14 @@
   private HttpFunctionEndpointSpec(
       Target target,
       UrlPathTemplate urlPathTemplate,
-      Duration maxRequestDuration,
-      Duration connectTimeout,
-      Duration readTimeout,
-      Duration writeTimeout,
-      int maxNumBatchRequests) {
+      int maxNumBatchRequests,
+      TypeName transportClientFactoryType,
+      ObjectNode transportClientProps) {
     this.target = target;
     this.urlPathTemplate = urlPathTemplate;
-    this.maxRequestDuration = maxRequestDuration;
-    this.connectTimeout = connectTimeout;
-    this.readTimeout = readTimeout;
-    this.writeTimeout = writeTimeout;
     this.maxNumBatchRequests = maxNumBatchRequests;
+    this.transportClientFactoryType = transportClientFactoryType;
+    this.transportClientProps = transportClientProps;
   }
 
   @Override
@@ -77,102 +79,55 @@
     return urlPathTemplate;
   }
 
-  public Duration maxRequestDuration() {
-    return maxRequestDuration;
-  }
-
-  public Duration connectTimeout() {
-    return connectTimeout;
-  }
-
-  public Duration readTimeout() {
-    return readTimeout;
-  }
-
-  public Duration writeTimeout() {
-    return writeTimeout;
-  }
-
   public int maxNumBatchRequests() {
     return maxNumBatchRequests;
   }
 
+  public TypeName transportClientFactoryType() {
+    return transportClientFactoryType;
+  }
+
+  public ObjectNode transportClientProperties() {
+    return transportClientProps;
+  }
+
   public static final class Builder {
 
     private final Target target;
     private final UrlPathTemplate urlPathTemplate;
-
-    private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
-    private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
-    private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
-    private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
     private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
 
+    private TypeName transportClientFactoryType = OKHTTP_CLIENT_FACTORY_TYPE;
+    private ObjectNode transportClientProperties = new ObjectMapper().createObjectNode();
+
     private Builder(Target target, UrlPathTemplate urlPathTemplate) {
       this.target = Objects.requireNonNull(target);
       this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate);
     }
 
-    public Builder withMaxRequestDuration(Duration duration) {
-      this.maxRequestDuration = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withConnectTimeoutDuration(Duration duration) {
-      this.connectTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withReadTimeoutDuration(Duration duration) {
-      this.readTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withWriteTimeoutDuration(Duration duration) {
-      this.writeTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
     public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
       this.maxNumBatchRequests = maxNumBatchRequests;
       return this;
     }
 
+    public Builder withTransportClientFactoryType(TypeName transportClientFactoryType) {
+      this.transportClientFactoryType = Objects.requireNonNull(transportClientFactoryType);
+      return this;
+    }
+
+    public Builder withTransportClientProperties(ObjectNode transportClientProperties) {
+      this.transportClientProperties = Objects.requireNonNull(transportClientProperties);
+      return this;
+    }
+
     public HttpFunctionEndpointSpec build() {
-      validateTimeouts();
 
       return new HttpFunctionEndpointSpec(
           target,
           urlPathTemplate,
-          maxRequestDuration,
-          connectTimeout,
-          readTimeout,
-          writeTimeout,
-          maxNumBatchRequests);
-    }
-
-    private Duration requireNonZeroDuration(Duration duration) {
-      Objects.requireNonNull(duration);
-      if (duration.equals(Duration.ZERO)) {
-        throw new IllegalArgumentException("Timeout durations must be larger than 0.");
-      }
-
-      return duration;
-    }
-
-    private void validateTimeouts() {
-      if (connectTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException(
-            "Connect timeout cannot be larger than request timeout.");
-      }
-
-      if (readTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
-      }
-
-      if (writeTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
-      }
+          maxNumBatchRequests,
+          transportClientFactoryType,
+          transportClientProperties);
     }
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 5445500..2bcb5dd 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -17,21 +17,20 @@
  */
 package org.apache.flink.statefun.flink.core.httpfn;
 
-import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
-
 import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.statefun.flink.core.common.ManagingResources;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.TypeName;
 
 @NotThreadSafe
 public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
@@ -39,23 +38,25 @@
   private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
   private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
 
-  /** lazily initialized by {code buildHttpClient} */
-  @Nullable private OkHttpClient sharedClient;
-
-  private volatile boolean shutdown;
+  private final ExtensionResolver extensionResolver;
 
   public HttpFunctionProvider(
       Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
-      Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
+      Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs,
+      ExtensionResolver extensionResolver) {
     this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
     this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
+    this.extensionResolver = Objects.requireNonNull(extensionResolver);
   }
 
   @Override
   public StatefulFunction functionOfType(FunctionType functionType) {
     final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
+    final URI endpointUrl = endpointsSpec.urlPathTemplate().apply(functionType);
+
     return new RequestReplyFunction(
-        endpointsSpec.maxNumBatchRequests(), buildHttpClient(endpointsSpec, functionType));
+        endpointsSpec.maxNumBatchRequests(),
+        buildTransportClientFromSpec(endpointUrl, endpointsSpec));
   }
 
   private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
@@ -71,40 +72,19 @@
     throw new IllegalStateException("Unknown type: " + functionType);
   }
 
-  private RequestReplyClient buildHttpClient(
-      HttpFunctionEndpointSpec spec, FunctionType functionType) {
-    if (sharedClient == null) {
-      sharedClient = OkHttpUtils.newClient();
-    }
-    OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
-    clientBuilder.callTimeout(spec.maxRequestDuration());
-    clientBuilder.connectTimeout(spec.connectTimeout());
-    clientBuilder.readTimeout(spec.readTimeout());
-    clientBuilder.writeTimeout(spec.writeTimeout());
+  private RequestReplyClient buildTransportClientFromSpec(
+      URI endpointUrl, HttpFunctionEndpointSpec endpointsSpec) {
+    final TypeName factoryType = endpointsSpec.transportClientFactoryType();
+    final ObjectNode properties = endpointsSpec.transportClientProperties();
 
-    URI endpointUrl = spec.urlPathTemplate().apply(functionType);
-
-    final HttpUrl url;
-    if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
-      UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
-      url =
-          new HttpUrl.Builder()
-              .scheme("http")
-              .host("unused")
-              .addPathSegment(endpoint.pathSegment)
-              .build();
-
-      configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
-    } else {
-      url = HttpUrl.get(endpointUrl);
-    }
-    return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
+    final RequestReplyClientFactory factory =
+        extensionResolver.resolveExtension(factoryType, RequestReplyClientFactory.class);
+    return factory.createTransportClient(properties, endpointUrl);
   }
 
   @Override
   public void shutdown() {
-    shutdown = true;
-    OkHttpUtils.closeSilently(sharedClient);
+    // TODO all RequestReplyClientFactory's need to be shutdown.
+    // TODO This should probably happen in StatefulFunctionsUniverse.
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java
new file mode 100644
index 0000000..d5f8a35
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import org.apache.flink.statefun.sdk.TypeName;
+
+public final class TransportClientConstants {
+
+  public static final TypeName OKHTTP_CLIENT_FACTORY_TYPE =
+      TypeName.parseFrom("io.statefun.transports/okhttp");
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java
new file mode 100644
index 0000000..1bad8db
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public class TransportClientsModule implements ExtensionModule {
+  @Override
+  public void configure(Map<String, String> globalConfigurations, Binder binder) {
+    binder.bindExtension(
+        TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE,
+        new DefaultHttpRequestReplyClientFactory());
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
index d3040b7..e9b8ef7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -22,6 +22,7 @@
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
 import org.apache.flink.statefun.sdk.EgressType;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -38,7 +39,11 @@
   }
 
   @Override
-  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+  public void bind(
+      Binder binder,
+      ExtensionResolver extensionResolver,
+      JsonNode moduleSpecRootNode,
+      FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> egressNodes =
         Selectors.listAt(moduleSpecRootNode, EGRESS_SPECS_POINTER);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index 580e6c7..0165560 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -30,7 +30,8 @@
   //  Supported versions
   // ============================================================
 
-  v3_0("3.0");
+  v3_0("3.0"),
+  v3_1("3.1");
 
   private String versionStr;
 
@@ -51,6 +52,8 @@
         return v2_0;
       case "3.0":
         return v3_0;
+      case "3.1":
+        return v3_1;
       default:
         throw new IllegalArgumentException("Unrecognized format version: " + versionStr);
     }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
index 74500cd..337aa4e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -30,9 +30,12 @@
 import java.util.stream.StreamSupport;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
@@ -53,22 +56,25 @@
         JsonPointer.compile("/endpoint/spec/functions");
     private static final JsonPointer URL_PATH_TEMPLATE =
         JsonPointer.compile("/endpoint/spec/urlPathTemplate");
-    private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
     private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
         JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
+    private static final JsonPointer TRANSPORT = JsonPointer.compile("/endpoint/spec/transport");
+
+    @Deprecated
+    private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
   }
 
-  private static final class TimeoutPointers {
-    private static final JsonPointer CALL = JsonPointer.compile("/call");
-    private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
-    private static final JsonPointer READ = JsonPointer.compile("/read");
-    private static final JsonPointer WRITE = JsonPointer.compile("/write");
+  private static final class TransportPointers {
+    private static final JsonPointer CLIENT_FACTORY_TYPE = JsonPointer.compile("/type");
   }
 
   @Override
   public void bind(
-      StatefulFunctionModule.Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion) {
-    if (formatVersion != FormatVersion.v3_0) {
+      StatefulFunctionModule.Binder binder,
+      ExtensionResolver extensionResolver,
+      JsonNode moduleSpecNode,
+      FormatVersion formatVersion) {
+    if (formatVersion.compareTo(FormatVersion.v3_0) < 0) {
       throw new IllegalArgumentException("endpoints is only supported with format version 3.0.");
     }
 
@@ -76,7 +82,7 @@
         functionEndpointSpecNodes(moduleSpecNode);
 
     for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
-        parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
+        parseFunctionEndpointSpecs(functionEndpointsSpecNodes, formatVersion).entrySet()) {
       final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
       final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
           new HashMap<>();
@@ -94,7 +100,11 @@
               });
 
       StatefulFunctionProvider provider =
-          functionProvider(entry.getKey(), specificTypeEndpointSpecs, perNamespaceEndpointSpecs);
+          functionProvider(
+              entry.getKey(),
+              specificTypeEndpointSpecs,
+              perNamespaceEndpointSpecs,
+              extensionResolver);
       specificTypeEndpointSpecs
           .keySet()
           .forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
@@ -110,14 +120,15 @@
   }
 
   private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>>
-      parseFunctionEndpointSpecs(Iterable<? extends JsonNode> functionEndpointsSpecNodes) {
+      parseFunctionEndpointSpecs(
+          Iterable<? extends JsonNode> functionEndpointsSpecNodes, FormatVersion version) {
     return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), false)
-        .map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec)
+        .map(node -> FunctionEndpointJsonEntity.parseFunctionEndpointsSpec(node, version))
         .collect(groupingBy(FunctionEndpointSpec::kind, toList()));
   }
 
   private static FunctionEndpointSpec parseFunctionEndpointsSpec(
-      JsonNode functionEndpointSpecNode) {
+      JsonNode functionEndpointSpecNode, FormatVersion version) {
     FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode);
 
     switch (kind) {
@@ -126,17 +137,24 @@
             HttpFunctionEndpointSpec.builder(
                 target(functionEndpointSpecNode), urlPathTemplate(functionEndpointSpecNode));
 
-        JsonNode timeoutsNode = functionEndpointSpecNode.at(SpecPointers.TIMEOUTS);
         optionalMaxNumBatchRequests(functionEndpointSpecNode)
             .ifPresent(specBuilder::withMaxNumBatchRequests);
-        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CALL)
-            .ifPresent(specBuilder::withMaxRequestDuration);
-        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CONNECT)
-            .ifPresent(specBuilder::withConnectTimeoutDuration);
-        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.READ)
-            .ifPresent(specBuilder::withReadTimeoutDuration);
-        optionalTimeoutDuration(timeoutsNode, TimeoutPointers.WRITE)
-            .ifPresent(specBuilder::withWriteTimeoutDuration);
+
+        switch (version) {
+          case v3_1:
+            final Optional<ObjectNode> transportSpec =
+                Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TRANSPORT);
+            transportSpec.ifPresent(spec -> configureHttpTransport(specBuilder, spec));
+            break;
+          case v3_0:
+            final Optional<ObjectNode> deprecatedTimeoutsSpec =
+                Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TIMEOUTS);
+            deprecatedTimeoutsSpec.ifPresent(
+                spec -> configureDeprecatedHttpTimeoutsSpec(specBuilder, spec));
+            break;
+          default:
+            throw new IllegalStateException("Unsupported format version: " + version);
+        }
 
         return specBuilder.build();
       case GRPC:
@@ -146,6 +164,34 @@
     }
   }
 
+  private static void configureHttpTransport(
+      HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode transportSpecNode) {
+    final Optional<TypeName> transportClientFactoryType =
+        Selectors.optionalTextAt(transportSpecNode, TransportPointers.CLIENT_FACTORY_TYPE)
+            .map(TypeName::parseFrom);
+    transportClientFactoryType.ifPresent(endpointSpecBuilder::withTransportClientFactoryType);
+
+    // pass the transport spec node as is as the client properties
+    endpointSpecBuilder.withTransportClientProperties(transportSpecNode);
+  }
+
+  private static void configureDeprecatedHttpTimeoutsSpec(
+      HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode deprecatedHttpTimeoutsSpec) {
+    final ObjectNode reconstructedTimeoutsSpec =
+        reconstructTimeoutsSpecNode(deprecatedHttpTimeoutsSpec);
+    endpointSpecBuilder.withTransportClientProperties(reconstructedTimeoutsSpec);
+  }
+
+  private static ObjectNode reconstructTimeoutsSpecNode(ObjectNode deprecatedHttpTimeoutsSpec) {
+    try {
+      return (ObjectNode)
+          new ObjectMapper()
+              .readTree("{\"timeouts\":" + deprecatedHttpTimeoutsSpec.toString() + "}");
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to reconstruct deprecated timeouts spec node.");
+    }
+  }
+
   private static FunctionEndpointSpec.Kind endpointKind(JsonNode functionEndpointSpecNode) {
     String endpointKind = Selectors.textAt(functionEndpointSpecNode, MetaPointers.KIND);
     return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
@@ -192,12 +238,14 @@
   private static StatefulFunctionProvider functionProvider(
       FunctionEndpointSpec.Kind kind,
       Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
-      Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+      Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs,
+      ExtensionResolver extensionResolver) {
     switch (kind) {
       case HTTP:
         return new HttpFunctionProvider(
             castValues(specificTypeEndpointSpecs),
-            castValues(namespaceAsKey(perNamespaceEndpointSpecs)));
+            castValues(namespaceAsKey(perNamespaceEndpointSpecs)),
+            extensionResolver);
       case GRPC:
         throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
       default:
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
index 83c5d00..ad1fb35 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -24,6 +24,7 @@
 import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
 import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
 import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
@@ -41,7 +42,11 @@
   }
 
   @Override
-  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+  public void bind(
+      Binder binder,
+      ExtensionResolver extensionResolver,
+      JsonNode moduleSpecRootNode,
+      FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> ingressNodes =
         Selectors.listAt(moduleSpecRootNode, INGRESS_SPECS_POINTER);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
index 9d40307..f1afd79 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
 
 /**
@@ -35,5 +36,9 @@
    * @param moduleSpecNode the root module spec node.
    * @param formatVersion the format version of the module spec.
    */
-  void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion);
+  void bind(
+      Binder binder,
+      ExtensionResolver extensionResolver,
+      JsonNode moduleSpecNode,
+      FormatVersion formatVersion);
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index eb26c44..d38ab6d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
 final class JsonModule implements StatefulFunctionModule {
@@ -49,10 +50,17 @@
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
+      ENTITIES.forEach(
+          jsonEntity ->
+              jsonEntity.bind(binder, getExtensionResolver(binder), moduleSpecNode, formatVersion));
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
     }
   }
+
+  // TODO expose ExtensionResolver properly once we have more usages
+  private static ExtensionResolver getExtensionResolver(Binder binder) {
+    return (ExtensionResolver) binder;
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
index 30c3ff2..66d99a2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -30,6 +30,7 @@
 import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.Router;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
@@ -50,7 +51,11 @@
   }
 
   @Override
-  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+  public void bind(
+      Binder binder,
+      ExtensionResolver extensionResolver,
+      JsonNode moduleSpecRootNode,
+      FormatVersion formatVersion) {
     final Iterable<? extends JsonNode> routerNodes =
         Selectors.listAt(moduleSpecRootNode, ROUTER_SPECS_POINTER);
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java
new file mode 100644
index 0000000..cee0338
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+/**
+ * Decorator for a {@link RequestReplyClient} that makes sure we always use the correct classloader.
+ * This is required since client implementation may be user provided.
+ */
+public final class ClassLoaderSafeRequestReplyClient implements RequestReplyClient {
+
+  private final ClassLoader delegateClassLoader;
+  private final RequestReplyClient delegate;
+
+  public ClassLoaderSafeRequestReplyClient(RequestReplyClient delegate) {
+    this.delegate = Objects.requireNonNull(delegate);
+    this.delegateClassLoader = delegate.getClass().getClassLoader();
+  }
+
+  @Override
+  public CompletableFuture<FromFunction> call(
+      ToFunctionRequestSummary requestSummary,
+      RemoteInvocationMetrics metrics,
+      ToFunction toFunction) {
+    final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+
+    try {
+      Thread.currentThread().setContextClassLoader(delegateClassLoader);
+      return delegate.call(requestSummary, metrics, toFunction);
+    } finally {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
index 19fddee..7a423ff 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
@@ -19,10 +19,12 @@
 package org.apache.flink.statefun.flink.core.reqreply;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
 import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
 
+@PublicEvolving
 public interface RequestReplyClient {
 
   CompletableFuture<FromFunction> call(
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java
new file mode 100644
index 0000000..865e246
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import java.net.URI;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+@PublicEvolving
+public interface RequestReplyClientFactory {
+  RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl);
+
+  void cleanup();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java
new file mode 100644
index 0000000..c7a44f9
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.spi;
+
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
+
+/**
+ * Resolves a bound extension (bound by {@link ExtensionModule}s) given specified {@link TypeName}s.
+ */
+public interface ExtensionResolver {
+  <T> T resolveExtension(TypeName typeName, Class<T> extensionClass);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
index 6ca5d24..26bd524 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
@@ -24,14 +24,19 @@
 import org.apache.flink.statefun.flink.core.jsonmodule.JsonServiceLoader;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
 import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 
 public final class Modules {
+  private final List<ExtensionModule> extensionModules;
   private final List<FlinkIoModule> ioModules;
   private final List<StatefulFunctionModule> statefulFunctionModules;
 
   private Modules(
-      List<FlinkIoModule> ioModules, List<StatefulFunctionModule> statefulFunctionModules) {
+      List<ExtensionModule> extensionModules,
+      List<FlinkIoModule> ioModules,
+      List<StatefulFunctionModule> statefulFunctionModules) {
+    this.extensionModules = extensionModules;
     this.ioModules = ioModules;
     this.statefulFunctionModules = statefulFunctionModules;
   }
@@ -39,7 +44,11 @@
   public static Modules loadFromClassPath() {
     List<StatefulFunctionModule> statefulFunctionModules = new ArrayList<>();
     List<FlinkIoModule> ioModules = new ArrayList<>();
+    List<ExtensionModule> extensionModules = new ArrayList<>();
 
+    for (ExtensionModule extensionModule : ServiceLoader.load(ExtensionModule.class)) {
+      extensionModules.add(extensionModule);
+    }
     for (StatefulFunctionModule provider : ServiceLoader.load(StatefulFunctionModule.class)) {
       statefulFunctionModules.add(provider);
     }
@@ -49,7 +58,7 @@
     for (FlinkIoModule provider : ServiceLoader.load(FlinkIoModule.class)) {
       ioModules.add(provider);
     }
-    return new Modules(ioModules, statefulFunctionModules);
+    return new Modules(extensionModules, ioModules, statefulFunctionModules);
   }
 
   public StatefulFunctionsUniverse createStatefulFunctionsUniverse(
@@ -60,6 +69,13 @@
 
     final Map<String, String> globalConfiguration = configuration.getGlobalConfigurations();
 
+    // it is important to bind and configure the extension modules first, since
+    // other modules (IO and functions) may use extensions already.
+    for (ExtensionModule module : extensionModules) {
+      try (SetContextClassLoader ignored = new SetContextClassLoader(module)) {
+        module.configure(globalConfiguration, universe);
+      }
+    }
     for (FlinkIoModule module : ioModules) {
       try (SetContextClassLoader ignored = new SetContextClassLoader(module)) {
         module.configure(globalConfiguration, universe);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java
new file mode 100644
index 0000000..859e1b5
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import static org.hamcrest.core.IsSame.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.junit.Test;
+
+public class StatefulFunctionsUniverseTest {
+
+  @Test
+  public void testExtensions() {
+    final StatefulFunctionsUniverse universe = emptyUniverse();
+
+    final ExtensionImpl extension = new ExtensionImpl();
+    universe.bindExtension(TypeName.parseFrom("test.namespace/test.name"), extension);
+
+    assertThat(
+        extension,
+        sameInstance(
+            universe.resolveExtension(
+                TypeName.parseFrom("test.namespace/test.name"), BaseExtension.class)));
+  }
+
+  private static StatefulFunctionsUniverse emptyUniverse() {
+    return new StatefulFunctionsUniverse(
+        MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
+  }
+
+  private interface BaseExtension {}
+
+  private static class ExtensionImpl implements BaseExtension {}
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index cc928b0..cba075f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -25,22 +25,43 @@
 import static org.junit.Assert.assertThat;
 
 import com.google.protobuf.Message;
+import java.net.URI;
 import java.net.URL;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.TypeName;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JsonModuleTest {
 
-  private static final String modulePath = "module-v3_0/module.yaml";
+  private final String modulePath;
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<String> modulePaths() {
+    return Arrays.asList("module-v3_0/module.yaml", "module-v3_1/module.yaml");
+  }
+
+  public JsonModuleTest(String modulePath) {
+    this.modulePath = modulePath;
+  }
 
   @Test
   public void exampleUsage() {
@@ -52,9 +73,11 @@
   @Test
   public void testFunctions() {
     StatefulFunctionModule module = fromPath(modulePath);
+    ExtensionModule extensionModule =
+        transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
 
     StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
+    setupUniverse(universe, module, extensionModule);
 
     assertThat(
         universe.functions(),
@@ -68,9 +91,11 @@
   @Test
   public void testRouters() {
     StatefulFunctionModule module = fromPath(modulePath);
+    ExtensionModule extensionModule =
+        transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
 
     StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
+    setupUniverse(universe, module, extensionModule);
 
     assertThat(
         universe.routers(),
@@ -80,9 +105,11 @@
   @Test
   public void testIngresses() {
     StatefulFunctionModule module = fromPath(modulePath);
+    ExtensionModule extensionModule =
+        transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
 
     StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
+    setupUniverse(universe, module, extensionModule);
 
     assertThat(
         universe.ingress(),
@@ -92,9 +119,11 @@
   @Test
   public void testEgresses() {
     StatefulFunctionModule module = fromPath(modulePath);
+    ExtensionModule extensionModule =
+        transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
 
     StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
+    setupUniverse(universe, module, extensionModule);
 
     assertThat(
         universe.egress(),
@@ -108,8 +137,48 @@
     return JsonServiceLoader.fromUrl(mapper, moduleUrl);
   }
 
+  private static ExtensionModule transportClientExtensions(TypeName type) {
+    return new TransportClientBindingModule(type);
+  }
+
   private static StatefulFunctionsUniverse emptyUniverse() {
     return new StatefulFunctionsUniverse(
         MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
   }
+
+  private static void setupUniverse(
+      StatefulFunctionsUniverse universe,
+      StatefulFunctionModule functionModule,
+      ExtensionModule extensionModule) {
+    final Map<String, String> globalConfig = new HashMap<>();
+    extensionModule.configure(globalConfig, universe);
+    functionModule.configure(globalConfig, universe);
+  }
+
+  private static class TransportClientBindingModule implements ExtensionModule {
+
+    private final TypeName transportClientType;
+
+    TransportClientBindingModule(TypeName transportClientType) {
+      this.transportClientType = transportClientType;
+    }
+
+    @Override
+    public void configure(Map<String, String> globalConfigurations, Binder binder) {
+      binder.bindExtension(transportClientType, new TestRequestReplyClientFactory());
+    }
+  }
+
+  private static class TestRequestReplyClientFactory implements RequestReplyClientFactory {
+    @Override
+    public RequestReplyClient createTransportClient(
+        ObjectNode transportProperties, URI endpointUrl) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void cleanup() {
+      throw new UnsupportedOperationException();
+    }
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml
new file mode 100644
index 0000000..8b595d4
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.1"
+
+module:
+  meta:
+    type: remote
+  spec:
+    endpoints:
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            functions: com.foo.bar/*
+            urlPathTemplate: http://bar.foo.com:8080/functions/{function.name}
+            maxNumBatchRequests: 10000
+            transport:
+              timeouts:
+                call: 1minutes
+                connect: 10seconds
+                read: 10second
+                write: 10seconds
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            functions: com.foo.bar.2/*
+            urlPathTemplate: http://2.bar.foo.com:8080/functions/{function.name}
+            transport:
+              type: my.custom/http.transport.type
+              property1: value1
+              property2:
+                - k1: v1
+                - k2: v2
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            functions: com.foo.bar/specific_function
+            urlPathTemplate: http://bar.foo.com:8080/functions/abc
+      - endpoint:
+          meta:
+            kind: http
+          spec:
+            functions: com.other.namespace/hello
+            urlPathTemplate: http://namespace.other.com:8080/hello
+    routers:
+      - router:
+          meta:
+            type: org.apache.flink.statefun.sdk/protobuf-router
+          spec:
+            ingress: com.mycomp.igal/names
+            target: "com.example/hello/{{$.name}}"
+            messageType: org.apache.flink.test.SimpleMessage
+            descriptorSet: classpath:test.desc
+    ingresses:
+      - ingress:
+          meta:
+            type: statefun.kafka.io/protobuf-ingress
+            id: com.mycomp.igal/names
+          spec:
+            address: kafka-broker:9092
+            topics:
+              - names
+            properties:
+              - consumer.group: greeter
+            messageType: org.apache.flink.test.SimpleMessage
+            descriptorSet: classpath:test.desc
+    egresses:
+      - egress:
+          meta:
+            type: io.statefun.kafka/egress
+            id: com.mycomp.foo/bar
+          spec:
+            address: kafka-broker:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 100000
+            properties:
+              - foo.config: bar
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index d24636b..8cc2ec0 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -21,7 +21,11 @@
 import java.net.URI;
 import java.time.Duration;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
+import org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.Target;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.UrlPathTemplate;
 import org.apache.flink.statefun.sdk.FunctionType;
@@ -29,6 +33,9 @@
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
 
+  private final DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec =
+      new DefaultHttpRequestReplyClientSpec.Timeouts();
+
   /**
    * Create a new builder for a remote function with a given type and an endpoint.
    *
@@ -57,7 +64,7 @@
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withMaxRequestDuration(Duration duration) {
-    builder.withMaxRequestDuration(duration);
+    transportClientTimeoutsSpec.setCallTimeout(duration);
     return this;
   }
 
@@ -68,7 +75,7 @@
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withConnectTimeout(Duration duration) {
-    builder.withConnectTimeoutDuration(duration);
+    transportClientTimeoutsSpec.setConnectTimeout(duration);
     return this;
   }
 
@@ -79,7 +86,7 @@
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withReadTimeout(Duration duration) {
-    builder.withReadTimeoutDuration(duration);
+    transportClientTimeoutsSpec.setReadTimeout(duration);
     return this;
   }
 
@@ -90,7 +97,7 @@
    * @return this builder.
    */
   public RequestReplyFunctionBuilder withWriteTimeout(Duration duration) {
-    builder.withWriteTimeoutDuration(duration);
+    transportClientTimeoutsSpec.setWriteTimeout(duration);
     return this;
   }
 
@@ -107,6 +114,18 @@
 
   @Internal
   HttpFunctionEndpointSpec spec() {
+    builder.withTransportClientFactoryType(TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE);
+    builder.withTransportClientProperties(
+        transportClientPropertiesAsObjectNode(transportClientTimeoutsSpec));
     return builder.build();
   }
+
+  private static ObjectNode transportClientPropertiesAsObjectNode(
+      DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec) {
+    final DefaultHttpRequestReplyClientSpec transportClientSpecPojo =
+        new DefaultHttpRequestReplyClientSpec();
+    transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec);
+
+    return new ObjectMapper().valueToTree(transportClientSpecPojo);
+  }
 }
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
index eed278c..38477d5 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
@@ -24,10 +24,15 @@
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientFactory;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.TypeName;
 
 @NotThreadSafe
 @Internal
@@ -45,8 +50,26 @@
   @Override
   public StatefulFunction functionOfType(FunctionType type) {
     if (delegate == null) {
-      delegate = new HttpFunctionProvider(supportedTypes, Collections.emptyMap());
+      delegate =
+          new HttpFunctionProvider(
+              supportedTypes, Collections.emptyMap(), new OkHttpTransportClientExtensionResolver());
     }
     return delegate.functionOfType(type);
   }
+
+  private static class OkHttpTransportClientExtensionResolver implements ExtensionResolver {
+
+    private final DefaultHttpRequestReplyClientFactory defaultTransportClientFactory =
+        new DefaultHttpRequestReplyClientFactory();
+
+    @Override
+    public <T> T resolveExtension(TypeName typeName, Class<T> extensionClass) {
+      // the DataStream bridge SDK only supports using the default OkHttp request-reply client
+      if (!typeName.equals(TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE)
+          || extensionClass != RequestReplyClientFactory.class) {
+        throw new IllegalStateException("The DataStream SDK does not support extensions.");
+      }
+      return (T) defaultTransportClientFactory;
+    }
+  }
 }
diff --git a/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java
new file mode 100644
index 0000000..5f84ffd
--- /dev/null
+++ b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.spi;
+
+import java.util.Map;
+import org.apache.flink.statefun.sdk.TypeName;
+
+/**
+ * A module that binds multiple extension objects to the Stateful Functions application. Each
+ * extension is uniquely identified by a {@link TypeName}.
+ */
+public interface ExtensionModule {
+
+  /**
+   * This method binds multiple extension objects to the Stateful Functions application.
+   *
+   * @param globalConfigurations global configuration of the Stateful Functions application.
+   * @param binder binder for binding extensions.
+   */
+  void configure(Map<String, String> globalConfigurations, Binder binder);
+
+  interface Binder {
+    <T> void bindExtension(TypeName typeName, T extension);
+  }
+}