[FLINK-20335] [core] Remove support for module YAML versions 1.0 / 2.0

This closes #184.
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index 76ffde5..aa825b3 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -79,7 +79,6 @@
             .withRequestReplyRemoteFunction(
                 requestReplyFunctionBuilder(
                         REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
-                    .withPersistedState("seen_count")
                     .withMaxRequestDuration(Duration.ofSeconds(15))
                     .withMaxNumBatchRequests(500))
             .withEgressId(GREETINGS)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
index d5731d1..85df004 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
@@ -17,14 +17,14 @@
  */
 package org.apache.flink.statefun.flink.core.httpfn;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.Objects;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
-import org.apache.flink.types.Either;
 
-public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec {
+public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec, Serializable {
+
+  private static final long serialVersionUID = 1;
 
   private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
   private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
@@ -32,7 +32,7 @@
   private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
   private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
 
-  private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+  private final Target target;
   private final UrlPathTemplate urlPathTemplate;
 
   private final Duration maxRequestDuration;
@@ -41,13 +41,12 @@
   private final Duration writeTimeout;
   private final int maxNumBatchRequests;
 
-  public static Builder builder(
-      Either<FunctionType, FunctionTypeNamespaceMatcher> target, UrlPathTemplate urlPathTemplate) {
+  public static Builder builder(Target target, UrlPathTemplate urlPathTemplate) {
     return new Builder(target, urlPathTemplate);
   }
 
   private HttpFunctionEndpointSpec(
-      Either<FunctionType, FunctionTypeNamespaceMatcher> target,
+      Target target,
       UrlPathTemplate urlPathTemplate,
       Duration maxRequestDuration,
       Duration connectTimeout,
@@ -64,7 +63,7 @@
   }
 
   @Override
-  public Either<FunctionType, FunctionTypeNamespaceMatcher> target() {
+  public Target target() {
     return target;
   }
 
@@ -100,7 +99,7 @@
 
   public static final class Builder {
 
-    private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+    private final Target target;
     private final UrlPathTemplate urlPathTemplate;
 
     private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
@@ -109,9 +108,7 @@
     private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
     private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
 
-    private Builder(
-        Either<FunctionType, FunctionTypeNamespaceMatcher> target,
-        UrlPathTemplate urlPathTemplate) {
+    private Builder(Target target, UrlPathTemplate urlPathTemplate) {
       this.target = Objects.requireNonNull(target);
       this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate);
     }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 67c2c68..5445500 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -15,53 +15,64 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
 
+import java.net.URI;
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import org.apache.flink.statefun.flink.core.common.ManagingResources;
-import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 
 @NotThreadSafe
-public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
-  private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
+public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
+
+  private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
+  private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
 
   /** lazily initialized by {code buildHttpClient} */
   @Nullable private OkHttpClient sharedClient;
 
   private volatile boolean shutdown;
 
-  public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
-    this.supportedTypes = supportedTypes;
+  public HttpFunctionProvider(
+      Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
+      Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
+    this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
+    this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
   }
 
   @Override
-  public RequestReplyFunction functionOfType(FunctionType type) {
-    HttpFunctionSpec spec = supportedTypes.get(type);
-    if (spec == null) {
-      throw new IllegalArgumentException("Unsupported type " + type);
-    }
+  public StatefulFunction functionOfType(FunctionType functionType) {
+    final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
     return new RequestReplyFunction(
-        new PersistedRemoteFunctionValues(spec.states()),
-        spec.maxNumBatchRequests(),
-        buildHttpClient(spec));
+        endpointsSpec.maxNumBatchRequests(), buildHttpClient(endpointsSpec, functionType));
   }
 
-  public HttpFunctionSpec getFunctionSpec(FunctionType type) {
-    return supportedTypes.get(type);
+  private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
+    HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType);
+    if (endpointSpec != null) {
+      return endpointSpec;
+    }
+    endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace());
+    if (endpointSpec != null) {
+      return endpointSpec;
+    }
+
+    throw new IllegalStateException("Unknown type: " + functionType);
   }
 
-  private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+  private RequestReplyClient buildHttpClient(
+      HttpFunctionEndpointSpec spec, FunctionType functionType) {
     if (sharedClient == null) {
       sharedClient = OkHttpUtils.newClient();
     }
@@ -71,9 +82,11 @@
     clientBuilder.readTimeout(spec.readTimeout());
     clientBuilder.writeTimeout(spec.writeTimeout());
 
+    URI endpointUrl = spec.urlPathTemplate().apply(functionType);
+
     final HttpUrl url;
-    if (UnixDomainHttpEndpoint.validate(spec.endpoint())) {
-      UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
+    if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
+      UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
 
       url =
           new HttpUrl.Builder()
@@ -84,7 +97,7 @@
 
       configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
     } else {
-      url = HttpUrl.get(spec.endpoint());
+      url = HttpUrl.get(endpointUrl);
     }
     return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
deleted file mode 100644
index 42a7abb..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-
-public final class HttpFunctionSpec implements FunctionSpec, Serializable {
-
-  private static final long serialVersionUID = 1;
-
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
-  private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
-  private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
-  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-
-  private final FunctionType functionType;
-  private final URI endpoint;
-  private final List<StateSpec> states;
-  private final Duration maxRequestDuration;
-  private final Duration connectTimeout;
-  private final Duration readTimeout;
-  private final Duration writeTimeout;
-  private final int maxNumBatchRequests;
-
-  private HttpFunctionSpec(
-      FunctionType functionType,
-      URI endpoint,
-      List<StateSpec> states,
-      Duration maxRequestDuration,
-      Duration connectTimeout,
-      Duration readTimeout,
-      Duration writeTimeout,
-      int maxNumBatchRequests) {
-    this.functionType = Objects.requireNonNull(functionType);
-    this.endpoint = Objects.requireNonNull(endpoint);
-    this.states = Objects.requireNonNull(states);
-    this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
-    this.connectTimeout = Objects.requireNonNull(connectTimeout);
-    this.readTimeout = Objects.requireNonNull(readTimeout);
-    this.writeTimeout = Objects.requireNonNull(writeTimeout);
-    this.maxNumBatchRequests = maxNumBatchRequests;
-  }
-
-  public static Builder builder(FunctionType functionType, URI endpoint) {
-    return new Builder(functionType, endpoint);
-  }
-
-  @Override
-  public FunctionType functionType() {
-    return functionType;
-  }
-
-  @Override
-  public Kind kind() {
-    return Kind.HTTP;
-  }
-
-  public URI endpoint() {
-    return endpoint;
-  }
-
-  public List<StateSpec> states() {
-    return states;
-  }
-
-  public Duration maxRequestDuration() {
-    return maxRequestDuration;
-  }
-
-  public Duration connectTimeout() {
-    return connectTimeout;
-  }
-
-  public Duration readTimeout() {
-    return readTimeout;
-  }
-
-  public Duration writeTimeout() {
-    return writeTimeout;
-  }
-
-  public int maxNumBatchRequests() {
-    return maxNumBatchRequests;
-  }
-
-  public static final class Builder {
-
-    private final FunctionType functionType;
-    private final URI endpoint;
-
-    private final List<StateSpec> states = new ArrayList<>();
-    private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
-    private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
-    private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
-    private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
-    private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
-
-    private Builder(FunctionType functionType, URI endpoint) {
-      this.functionType = Objects.requireNonNull(functionType);
-      this.endpoint = Objects.requireNonNull(endpoint);
-    }
-
-    public Builder withState(StateSpec stateSpec) {
-      this.states.add(stateSpec);
-      return this;
-    }
-
-    public Builder withMaxRequestDuration(Duration duration) {
-      this.maxRequestDuration = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withConnectTimeoutDuration(Duration duration) {
-      this.connectTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withReadTimeoutDuration(Duration duration) {
-      this.readTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withWriteTimeoutDuration(Duration duration) {
-      this.writeTimeout = requireNonZeroDuration(duration);
-      return this;
-    }
-
-    public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
-      this.maxNumBatchRequests = maxNumBatchRequests;
-      return this;
-    }
-
-    public HttpFunctionSpec build() {
-      validateTimeouts();
-
-      return new HttpFunctionSpec(
-          functionType,
-          endpoint,
-          states,
-          maxRequestDuration,
-          connectTimeout,
-          readTimeout,
-          writeTimeout,
-          maxNumBatchRequests);
-    }
-
-    private Duration requireNonZeroDuration(Duration duration) {
-      Objects.requireNonNull(duration);
-      if (duration.equals(Duration.ZERO)) {
-        throw new IllegalArgumentException("Timeout durations must be larger than 0.");
-      }
-
-      return duration;
-    }
-
-    private void validateTimeouts() {
-      if (connectTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException(
-            "Connect timeout cannot be larger than request timeout.");
-      }
-
-      if (readTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
-      }
-
-      if (writeTimeout.compareTo(maxRequestDuration) > 0) {
-        throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
-      }
-    }
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
deleted file mode 100644
index 8bb3c84..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.flink.statefun.sdk.state.Expiration;
-
-public final class StateSpec implements Serializable {
-
-  private static final long serialVersionUID = 1;
-
-  private final String name;
-  private final Expiration ttlExpiration;
-
-  public StateSpec(String name) {
-    this(name, Expiration.none());
-  }
-
-  public StateSpec(String name, Expiration ttlExpiration) {
-    this.name = Objects.requireNonNull(name);
-    this.ttlExpiration = Objects.requireNonNull(ttlExpiration);
-  }
-
-  public String name() {
-    return name;
-  }
-
-  public Expiration ttlExpiration() {
-    return ttlExpiration;
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
deleted file mode 100644
index ae4d7f4..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
-
-import java.net.URI;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import org.apache.flink.statefun.flink.core.common.ManagingResources;
-import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues;
-import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
-import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-
-@NotThreadSafe
-public final class TemplatedHttpFunctionProvider
-    implements StatefulFunctionProvider, ManagingResources {
-
-  private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
-  private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
-
-  /** lazily initialized by {code buildHttpClient} */
-  @Nullable private OkHttpClient sharedClient;
-
-  private volatile boolean shutdown;
-
-  public TemplatedHttpFunctionProvider(
-      Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
-      Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
-    this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
-    this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
-  }
-
-  @Override
-  public StatefulFunction functionOfType(FunctionType functionType) {
-    final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
-    return new RequestReplyFunction(
-        new PersistedRemoteFunctionValues(Collections.emptyList()),
-        endpointsSpec.maxNumBatchRequests(),
-        buildHttpClient(endpointsSpec, functionType));
-  }
-
-  private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
-    HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType);
-    if (endpointSpec != null) {
-      return endpointSpec;
-    }
-    endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace());
-    if (endpointSpec != null) {
-      return endpointSpec;
-    }
-
-    throw new IllegalStateException("Unknown type: " + functionType);
-  }
-
-  private RequestReplyClient buildHttpClient(
-      HttpFunctionEndpointSpec spec, FunctionType functionType) {
-    if (sharedClient == null) {
-      sharedClient = OkHttpUtils.newClient();
-    }
-    OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
-    clientBuilder.callTimeout(spec.maxRequestDuration());
-    clientBuilder.connectTimeout(spec.connectTimeout());
-    clientBuilder.readTimeout(spec.readTimeout());
-    clientBuilder.writeTimeout(spec.writeTimeout());
-
-    URI endpointUrl = spec.urlPathTemplate().apply(functionType);
-
-    final HttpUrl url;
-    if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
-      UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
-      url =
-          new HttpUrl.Builder()
-              .scheme("http")
-              .host("unused")
-              .addPathSegment(endpoint.pathSegment)
-              .build();
-
-      configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
-    } else {
-      url = HttpUrl.get(endpointUrl);
-    }
-    return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
-  }
-
-  @Override
-  public void shutdown() {
-    shutdown = true;
-    OkHttpUtils.closeSilently(sharedClient);
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index 87f4ec4..580e6c7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -19,8 +19,17 @@
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
 enum FormatVersion {
+  // ============================================================
+  //  EOL versions
+  // ============================================================
+
   v1_0("1.0"),
   v2_0("2.0"),
+
+  // ============================================================
+  //  Supported versions
+  // ============================================================
+
   v3_0("3.0");
 
   private String versionStr;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
index fac1bbe..e88f439 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -32,12 +32,11 @@
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.statefun.flink.common.json.Selectors;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
-import org.apache.flink.statefun.flink.core.httpfn.TemplatedHttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.apache.flink.types.Either;
 import org.apache.flink.util.TimeUtils;
 
 public final class FunctionEndpointJsonEntity implements JsonEntity {
@@ -82,17 +81,18 @@
     for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
         parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
       final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
-      final Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs = new HashMap<>();
+      final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
+          new HashMap<>();
 
       entry
           .getValue()
           .forEach(
               spec -> {
-                Either<FunctionType, FunctionTypeNamespaceMatcher> target = spec.target();
-                if (target.isLeft()) {
-                  specificTypeEndpointSpecs.put(target.left(), spec);
+                FunctionEndpointSpec.Target target = spec.target();
+                if (target.isSpecificFunctionType()) {
+                  specificTypeEndpointSpecs.put(target.asSpecificFunctionType(), spec);
                 } else {
-                  perNamespaceEndpointSpecs.put(target.right().targetNamespace(), spec);
+                  perNamespaceEndpointSpecs.put(target.asNamespace(), spec);
                 }
               });
 
@@ -103,10 +103,7 @@
           .forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
       perNamespaceEndpointSpecs
           .keySet()
-          .forEach(
-              namespace ->
-                  binder.bindFunctionProvider(
-                      FunctionTypeNamespaceMatcher.targetNamespace(namespace), provider));
+          .forEach(namespace -> binder.bindFunctionProvider(namespace, provider));
     }
   }
 
@@ -157,15 +154,14 @@
     return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
   }
 
-  private static Either<FunctionType, FunctionTypeNamespaceMatcher> target(
-      JsonNode functionEndpointSpecNode) {
+  private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) {
     JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME);
     String namespace = Selectors.textAt(targetNode, TypenamePointers.NAMESPACE);
     Optional<String> functionName =
         Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME);
     return (functionName.isPresent())
-        ? Either.Left(new FunctionType(namespace, functionName.get()))
-        : Either.Right(FunctionTypeNamespaceMatcher.targetNamespace(namespace));
+        ? FunctionEndpointSpec.Target.functionType(new FunctionType(namespace, functionName.get()))
+        : FunctionEndpointSpec.Target.namespace(namespace);
   }
 
   private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
@@ -186,11 +182,12 @@
   private static StatefulFunctionProvider functionProvider(
       FunctionEndpointSpec.Kind kind,
       Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
-      Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+      Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
     switch (kind) {
       case HTTP:
-        return new TemplatedHttpFunctionProvider(
-            castValues(specificTypeEndpointSpecs), castValues(perNamespaceEndpointSpecs));
+        return new HttpFunctionProvider(
+            castValues(specificTypeEndpointSpecs),
+            castValues(namespaceAsKey(perNamespaceEndpointSpecs)));
       case GRPC:
         throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
       default:
@@ -203,4 +200,13 @@
       Map<K, FunctionEndpointSpec> toCast) {
     return new HashMap(toCast);
   }
+
+  private static Map<String, FunctionEndpointSpec> namespaceAsKey(
+      Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+    final Map<String, FunctionEndpointSpec> converted =
+        new HashMap<>(perNamespaceEndpointSpecs.size());
+    perNamespaceEndpointSpecs.forEach(
+        (namespaceMatcher, spec) -> converted.put(namespaceMatcher.targetNamespace(), spec));
+    return converted;
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
index 940db87..9af6345 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
@@ -17,15 +17,15 @@
  */
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
+import java.io.Serializable;
 import java.net.URI;
 import java.util.Objects;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
-import org.apache.flink.types.Either;
 
 public interface FunctionEndpointSpec {
 
-  Either<FunctionType, FunctionTypeNamespaceMatcher> target();
+  Target target();
 
   Kind kind();
 
@@ -36,7 +36,72 @@
     GRPC
   }
 
-  class UrlPathTemplate {
+  abstract class Target implements Serializable {
+
+    public static Target namespace(String namespace) {
+      return new NamespaceTarget(FunctionTypeNamespaceMatcher.targetNamespace(namespace));
+    }
+
+    public static Target functionType(FunctionType functionType) {
+      return new FunctionTypeTarget(functionType);
+    }
+
+    public boolean isSpecificFunctionType() {
+      return this.getClass() == FunctionTypeTarget.class;
+    }
+
+    public boolean isNamespace() {
+      return this.getClass() == NamespaceTarget.class;
+    }
+
+    public abstract FunctionTypeNamespaceMatcher asNamespace();
+
+    public abstract FunctionType asSpecificFunctionType();
+
+    private static class NamespaceTarget extends Target {
+      private static final long serialVersionUID = 1;
+
+      private final FunctionTypeNamespaceMatcher namespaceMatcher;
+
+      private NamespaceTarget(FunctionTypeNamespaceMatcher namespaceMatcher) {
+        this.namespaceMatcher = Objects.requireNonNull(namespaceMatcher);
+      }
+
+      @Override
+      public FunctionTypeNamespaceMatcher asNamespace() {
+        return namespaceMatcher;
+      }
+
+      @Override
+      public FunctionType asSpecificFunctionType() {
+        throw new IllegalStateException("This target is not a specific function type");
+      }
+    }
+
+    private static class FunctionTypeTarget extends Target {
+      private static final long serialVersionUID = 1;
+
+      private final FunctionType functionType;
+
+      private FunctionTypeTarget(FunctionType functionType) {
+        this.functionType = Objects.requireNonNull(functionType);
+      }
+
+      @Override
+      public FunctionTypeNamespaceMatcher asNamespace() {
+        throw new IllegalStateException("This target is not a namespace.");
+      }
+
+      @Override
+      public FunctionType asSpecificFunctionType() {
+        return functionType;
+      }
+    }
+  }
+
+  class UrlPathTemplate implements Serializable {
+    private static final long serialVersionUID = 1;
+
     private static final String FUNCTION_NAME_HOLDER = "{typename.function}";
 
     private final String template;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
deleted file mode 100644
index 2b9c866..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toMap;
-import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
-import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import javax.annotation.Nullable;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
-import org.apache.flink.statefun.sdk.state.Expiration;
-import org.apache.flink.util.TimeUtils;
-
-final class FunctionJsonEntity implements JsonEntity {
-
-  private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions");
-
-  private static final class MetaPointers {
-    private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind");
-    private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type");
-  }
-
-  private static final class SpecPointers {
-    private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host");
-    private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
-    private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
-    private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
-
-    private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
-    private static final JsonPointer CONNECT_TIMEOUT =
-        JsonPointer.compile("/function/spec/connectTimeout");
-    private static final JsonPointer READ_TIMEOUT =
-        JsonPointer.compile("/function/spec/readTimeout");
-    private static final JsonPointer WRITE_TIMEOUT =
-        JsonPointer.compile("/function/spec/writeTimeout");
-
-    private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
-        JsonPointer.compile("/function/spec/maxNumBatchRequests");
-  }
-
-  private static final class StateSpecPointers {
-    private static final JsonPointer NAME = JsonPointer.compile("/name");
-    private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
-    private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode");
-  }
-
-  @Override
-  public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
-    final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
-
-    for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
-        parse(functionSpecNodes, formatVersion).entrySet()) {
-      StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
-      Set<FunctionType> functionTypes = entry.getValue().keySet();
-      for (FunctionType type : functionTypes) {
-        binder.bindFunctionProvider(type, provider);
-      }
-    }
-  }
-
-  private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
-      Iterable<? extends JsonNode> functionSpecNodes, FormatVersion formatVersion) {
-    return StreamSupport.stream(functionSpecNodes.spliterator(), false)
-        .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, formatVersion))
-        .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
-  }
-
-  private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
-    return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
-  }
-
-  private static FunctionSpec parseFunctionSpec(
-      JsonNode functionNode, FormatVersion formatVersion) {
-    String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
-    FunctionSpec.Kind kind =
-        FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
-    FunctionType functionType = functionType(functionNode);
-    switch (kind) {
-      case HTTP:
-        final HttpFunctionSpec.Builder specBuilder =
-            HttpFunctionSpec.builder(functionType, functionUri(functionNode));
-
-        final Function<JsonNode, List<StateSpec>> stateSpecParser =
-            functionStateParserOf(formatVersion);
-        for (StateSpec state : stateSpecParser.apply(functionNode)) {
-          specBuilder.withState(state);
-        }
-        optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
-        optionalTimeoutDuration(functionNode, SpecPointers.TIMEOUT)
-            .ifPresent(specBuilder::withMaxRequestDuration);
-        optionalTimeoutDuration(functionNode, SpecPointers.CONNECT_TIMEOUT)
-            .ifPresent(specBuilder::withConnectTimeoutDuration);
-        optionalTimeoutDuration(functionNode, SpecPointers.READ_TIMEOUT)
-            .ifPresent(specBuilder::withReadTimeoutDuration);
-        optionalTimeoutDuration(functionNode, SpecPointers.WRITE_TIMEOUT)
-            .ifPresent(specBuilder::withWriteTimeoutDuration);
-
-        return specBuilder.build();
-      case GRPC:
-        return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
-      default:
-        throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
-    }
-  }
-
-  private static Function<JsonNode, List<StateSpec>> functionStateParserOf(
-      FormatVersion formatVersion) {
-    switch (formatVersion) {
-      case v1_0:
-        return FunctionJsonEntity::functionStateSpecParserV1;
-      case v2_0:
-        return FunctionJsonEntity::functionStateSpecParserV2;
-      default:
-        throw new IllegalStateException("Unrecognized format version: " + formatVersion);
-    }
-  }
-
-  private static List<StateSpec> functionStateSpecParserV1(JsonNode functionNode) {
-    final List<String> stateNames = Selectors.textListAt(functionNode, SpecPointers.STATES);
-    return stateNames.stream().map(StateSpec::new).collect(Collectors.toList());
-  }
-
-  private static List<StateSpec> functionStateSpecParserV2(JsonNode functionNode) {
-    final Iterable<? extends JsonNode> stateSpecNodes =
-        Selectors.listAt(functionNode, SpecPointers.STATES);
-    final List<StateSpec> stateSpecs = new ArrayList<>();
-
-    stateSpecNodes.forEach(
-        stateSpecNode -> {
-          final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
-          final Expiration expiration = stateTtlExpiration(stateSpecNode);
-          stateSpecs.add(new StateSpec(name, expiration));
-        });
-    return stateSpecs;
-  }
-
-  private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
-    return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
-  }
-
-  private static Optional<Duration> optionalTimeoutDuration(
-      JsonNode functionNode, JsonPointer timeoutPointer) {
-    return Selectors.optionalTextAt(functionNode, timeoutPointer).map(TimeUtils::parseDuration);
-  }
-
-  private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
-    final Optional<Duration> duration =
-        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
-            .map(TimeUtils::parseDuration);
-
-    if (!duration.isPresent()) {
-      return Expiration.none();
-    }
-
-    final Optional<String> mode =
-        Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE);
-    if (!mode.isPresent()) {
-      return Expiration.expireAfterReadingOrWriting(duration.get());
-    }
-
-    switch (mode.get()) {
-      case "after-invoke":
-        return Expiration.expireAfterReadingOrWriting(duration.get());
-      case "after-write":
-        return Expiration.expireAfterWriting(duration.get());
-      default:
-        throw new IllegalArgumentException(
-            "Invalid state ttl expire mode; must be one of [after-invoke, after-write].");
-    }
-  }
-
-  private static FunctionType functionType(JsonNode functionNode) {
-    String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
-    NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
-    return new FunctionType(nn.namespace(), nn.name());
-  }
-
-  private static InetSocketAddress functionAddress(JsonNode functionNode) {
-    String host = Selectors.textAt(functionNode, SpecPointers.HOSTNAME);
-    int port = Selectors.integerAt(functionNode, SpecPointers.PORT);
-    return new InetSocketAddress(host, port);
-  }
-
-  private static URI functionUri(JsonNode functionNode) {
-    String uri = Selectors.textAt(functionNode, SpecPointers.ENDPOINT);
-    URI typedUri = URI.create(uri);
-    @Nullable String scheme = typedUri.getScheme();
-    if (scheme == null) {
-      throw new IllegalArgumentException(
-          "Missing scheme in function endpoint "
-              + uri
-              + "; an http or https scheme must be provided.");
-    }
-    if (scheme.equalsIgnoreCase("http")
-        || scheme.equalsIgnoreCase("https")
-        || scheme.equalsIgnoreCase("http+unix")
-        || scheme.equalsIgnoreCase("https+unix")) {
-      return typedUri;
-    }
-    throw new IllegalArgumentException(
-        "Missing scheme in function endpoint "
-            + uri
-            + "; an http or https or http+unix or https+unix scheme must be provided.");
-  }
-
-  private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
-    return toMap(FunctionSpec::functionType, Function.identity());
-  }
-
-  private static StatefulFunctionProvider functionProvider(
-      Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
-    switch (kind) {
-      case HTTP:
-        return new HttpFunctionProvider(
-            transformValues(definedFunctions, HttpFunctionSpec.class::cast));
-      case GRPC:
-        return new GrpcFunctionProvider(
-            transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
-      default:
-        throw new IllegalStateException("Unexpected value: " + kind);
-    }
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 658bb9b..eb26c44 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -32,13 +32,6 @@
   /** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */
   private static final List<JsonEntity> ENTITIES =
       Arrays.asList(
-          new FunctionJsonEntity(),
-          new IngressJsonEntity(),
-          new RouterJsonEntity(),
-          new EgressJsonEntity());
-
-  private static final List<JsonEntity> V3_ENTITIES =
-      Arrays.asList(
           new FunctionEndpointJsonEntity(),
           new IngressJsonEntity(),
           new RouterJsonEntity(),
@@ -56,11 +49,7 @@
 
   public void configure(Map<String, String> conf, Binder binder) {
     try {
-      if (formatVersion == FormatVersion.v3_0) {
-        V3_ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
-      } else {
-        ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
-      }
+      ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
     } catch (Throwable t) {
       throw new ModuleConfigurationException(
           format("Error while parsing module at %s", moduleUrl), t);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index a507540..d5c7e77 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -97,8 +97,15 @@
   }
 
   private static FormatVersion requireValidFormatVersion(JsonNode root) {
-    final String formatVersion = Selectors.textAt(root, FORMAT_VERSION);
-    return FormatVersion.fromString(formatVersion);
+    final String formatVersionStr = Selectors.textAt(root, FORMAT_VERSION);
+    final FormatVersion formatVersion = FormatVersion.fromString(formatVersionStr);
+    if (formatVersion.compareTo(FormatVersion.v3_0) < 0) {
+      throw new IllegalArgumentException(
+          "Only format versions higher than or equal to 3.0 is supported. Was version "
+              + formatVersion
+              + ".");
+    }
+    return formatVersion;
   }
 
   @VisibleForTesting
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index 857bcb6..6553a1a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -23,8 +23,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
@@ -39,20 +37,7 @@
 
   @Persisted private final PersistedStateRegistry stateRegistry = new PersistedStateRegistry();
 
-  private final Map<String, PersistedValue<byte[]>> managedStates;
-
-  /**
-   * @deprecated {@link PersistedRemoteFunctionValues} should no longer be instantiated with eagerly
-   *     declared state specs. State can now be dynamically registered with {@link
-   *     #registerStates(List)}. This constructor will be removed once old module specification
-   *     formats, which supports eager state declarations, are removed.
-   */
-  @Deprecated
-  public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) {
-    Objects.requireNonNull(stateSpecs);
-    this.managedStates = new HashMap<>(stateSpecs.size());
-    stateSpecs.forEach(this::createAndRegisterEagerValueState);
-  }
+  private final Map<String, PersistedValue<byte[]>> managedStates = new HashMap<>();
 
   void attachStateValues(InvocationBatchRequest.Builder batchBuilder) {
     for (Map.Entry<String, PersistedValue<byte[]>> managedStateEntry : managedStates.entrySet()) {
@@ -134,15 +119,6 @@
     }
   }
 
-  private void createAndRegisterEagerValueState(StateSpec stateSpec) {
-    final String stateName = stateSpec.name();
-
-    final PersistedValue<byte[]> stateValue =
-        PersistedValue.of(stateName, byte[].class, stateSpec.ttlExpiration());
-    stateRegistry.registerValue(stateValue);
-    managedStates.put(stateName, stateValue);
-  }
-
   private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) {
     final PersistedValue<byte[]> handle = managedStates.get(stateName);
     if (handle == null) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 01ee950..b2054f2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -25,6 +25,7 @@
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
@@ -70,13 +71,16 @@
 
   @Persisted private final PersistedRemoteFunctionValues managedStates;
 
-  public RequestReplyFunction(
-      PersistedRemoteFunctionValues managedStates,
-      int maxNumBatchRequests,
-      RequestReplyClient client) {
-    this.managedStates = Objects.requireNonNull(managedStates);
-    this.client = Objects.requireNonNull(client);
+  public RequestReplyFunction(int maxNumBatchRequests, RequestReplyClient client) {
+    this(new PersistedRemoteFunctionValues(), maxNumBatchRequests, client);
+  }
+
+  @VisibleForTesting
+  RequestReplyFunction(
+      PersistedRemoteFunctionValues states, int maxNumBatchRequests, RequestReplyClient client) {
+    this.managedStates = Objects.requireNonNull(states);
     this.maxNumBatchRequests = maxNumBatchRequests;
+    this.client = Objects.requireNonNull(client);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index 3001684..92014a9 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -17,14 +17,16 @@
  */
 package org.apache.flink.statefun.flink.core.jsonmodule;
 
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.Message;
 import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
@@ -35,24 +37,10 @@
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
 public class JsonModuleTest {
 
-  @Parameterized.Parameters(name = "Format version = {0}, module path = \"{1}\"")
-  public static Collection<?> modules() {
-    return Arrays.asList(
-        new Object[] {FormatVersion.v1_0, "module-v1_0/module.yaml"},
-        new Object[] {FormatVersion.v2_0, "module-v2_0/module.yaml"});
-  }
-
-  private final String modulePath;
-
-  public JsonModuleTest(FormatVersion ignored, String modulePath) {
-    this.modulePath = modulePath;
-  }
+  private static final String modulePath = "module-v3_0/module.yaml";
 
   @Test
   public void exampleUsage() {
@@ -71,9 +59,10 @@
     assertThat(
         universe.functions(),
         allOf(
-            hasKey(new FunctionType("com.example", "hello")),
-            hasKey(new FunctionType("com.foo", "world")),
-            hasKey(new FunctionType("com.bar", "world"))));
+            hasKey(new FunctionType("com.foo.bar", "specific_function")),
+            hasKey(new FunctionType("com.other.namespace", "hello"))));
+
+    assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar"));
   }
 
   @Test
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
deleted file mode 100644
index fcbb885..0000000
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.Message;
-import java.net.URL;
-import java.util.Collections;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
-import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
-import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.junit.Test;
-
-public class JsonModuleV3Test {
-
-  private static final String modulePath = "module-v3_0/module.yaml";
-
-  @Test
-  public void exampleUsage() {
-    StatefulFunctionModule module = fromPath(modulePath);
-
-    assertThat(module, notNullValue());
-  }
-
-  @Test
-  public void testFunctions() {
-    StatefulFunctionModule module = fromPath(modulePath);
-
-    StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
-
-    assertThat(
-        universe.functions(),
-        allOf(
-            hasKey(new FunctionType("com.foo.bar", "specific_function")),
-            hasKey(new FunctionType("com.other.namespace", "hello"))));
-
-    assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar"));
-  }
-
-  @Test
-  public void testRouters() {
-    StatefulFunctionModule module = fromPath(modulePath);
-
-    StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
-
-    assertThat(
-        universe.routers(),
-        hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names")));
-  }
-
-  @Test
-  public void testIngresses() {
-    StatefulFunctionModule module = fromPath(modulePath);
-
-    StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
-
-    assertThat(
-        universe.ingress(),
-        hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names")));
-  }
-
-  @Test
-  public void testEgresses() {
-    StatefulFunctionModule module = fromPath(modulePath);
-
-    StatefulFunctionsUniverse universe = emptyUniverse();
-    module.configure(Collections.emptyMap(), universe);
-
-    assertThat(
-        universe.egress(), hasKey(new EgressIdentifier<>("com.mycomp.foo", "bar", Any.class)));
-  }
-
-  private static StatefulFunctionModule fromPath(String path) {
-    URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
-    assertThat(moduleUrl, not(nullValue()));
-    ObjectMapper mapper = JsonServiceLoader.mapper();
-    return JsonServiceLoader.fromUrl(mapper, moduleUrl);
-  }
-
-  private static StatefulFunctionsUniverse emptyUniverse() {
-    return new StatefulFunctionsUniverse(
-        MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
-  }
-}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
index f633fc2..48dadc1 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
@@ -35,8 +35,7 @@
 
   @Test
   public void exampleUsage() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     // --- register persisted states
     values.registerStates(
@@ -63,8 +62,7 @@
 
   @Test
   public void zeroRegisteredStates() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
     values.attachStateValues(builder);
@@ -74,8 +72,7 @@
 
   @Test(expected = IllegalStateException.class)
   public void updatingNonRegisteredStateShouldThrow() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     values.updateStateValues(
         Collections.singletonList(
@@ -85,8 +82,7 @@
 
   @Test
   public void registeredStateWithEmptyValueShouldBeAttached() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
 
@@ -99,8 +95,7 @@
 
   @Test
   public void registeredStateWithDeletedValueShouldBeAttached() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
 
@@ -120,8 +115,7 @@
 
   @Test
   public void duplicateRegistrationsHasNoEffect() {
-    final PersistedRemoteFunctionValues values =
-        new PersistedRemoteFunctionValues(Collections.emptyList());
+    final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
 
     values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
     values.updateStateValues(
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index c4eb85a..d545281 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -40,7 +40,6 @@
 import java.util.stream.Collectors;
 import org.apache.flink.statefun.flink.core.TestUtils;
 import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
@@ -66,11 +65,9 @@
 
   private final FakeClient client = new FakeClient();
   private final FakeContext context = new FakeContext();
-  private final PersistedRemoteFunctionValues states =
-      new PersistedRemoteFunctionValues(Collections.singletonList(new StateSpec("session")));
 
   private final RequestReplyFunction functionUnderTest =
-      new RequestReplyFunction(states, 10, client);
+      new RequestReplyFunction(testInitialRegisteredState("session"), 10, client);
 
   @Test
   public void example() {
@@ -116,7 +113,7 @@
 
   @Test
   public void reachingABatchLimitTriggersBackpressure() {
-    RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client);
+    RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
 
     // send one message
     functionUnderTest.invoke(context, Any.getDefaultInstance());
@@ -132,7 +129,7 @@
 
   @Test
   public void returnedMessageReleaseBackpressure() {
-    RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client);
+    RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
 
     // the following invocations should cause backpressure
     functionUnderTest.invoke(context, Any.getDefaultInstance());
@@ -273,6 +270,15 @@
     assertThat(context.functionTypeMetrics().numBacklog, is(0));
   }
 
+  private static PersistedRemoteFunctionValues testInitialRegisteredState(
+      String existingStateName) {
+    final PersistedRemoteFunctionValues states = new PersistedRemoteFunctionValues();
+    states.registerStates(
+        Collections.singletonList(
+            PersistedValueSpec.newBuilder().setStateName(existingStateName).build()));
+    return states;
+  }
+
   private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
     return new AsyncOperationResult<>(
         new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null);
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 58c382c..d24636b 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -21,10 +21,10 @@
 import java.net.URI;
 import java.time.Duration;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
+import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.Target;
+import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.UrlPathTemplate;
 import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.state.Expiration;
 
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
@@ -41,33 +41,12 @@
     return new RequestReplyFunctionBuilder(functionType, endpoint);
   }
 
-  private final HttpFunctionSpec.Builder builder;
+  private final HttpFunctionEndpointSpec.Builder builder;
 
   private RequestReplyFunctionBuilder(FunctionType functionType, URI endpoint) {
-    this.builder = HttpFunctionSpec.builder(functionType, endpoint);
-  }
-
-  /**
-   * Declares a remote function state.
-   *
-   * @param name the name of the state to be used remotely.
-   * @return this builder.
-   */
-  public RequestReplyFunctionBuilder withPersistedState(String name) {
-    builder.withState(new StateSpec(name, Expiration.none()));
-    return this;
-  }
-
-  /**
-   * Declares a remote function state, with expiration.
-   *
-   * @param name the name of the state to be used remotely.
-   * @param ttlExpiration the expiration mode for which this state might be deleted.
-   * @return this builder.
-   */
-  public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) {
-    builder.withState(new StateSpec(name, ttlExpiration));
-    return this;
+    this.builder =
+        HttpFunctionEndpointSpec.builder(
+            Target.functionType(functionType), new UrlPathTemplate(endpoint.toASCIIString()));
   }
 
   /**
@@ -127,7 +106,7 @@
   }
 
   @Internal
-  HttpFunctionSpec spec() {
+  HttpFunctionEndpointSpec spec() {
     return builder.build();
   }
 }
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
index 28ac564..eed278c 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.statefun.flink.datastream;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 
@@ -34,17 +35,17 @@
 
   private static final long serialVersionUID = 1;
 
-  private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
+  private final Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes;
   private transient @Nullable HttpFunctionProvider delegate;
 
-  SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
+  SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes) {
     this.supportedTypes = Objects.requireNonNull(supportedTypes);
   }
 
   @Override
   public StatefulFunction functionOfType(FunctionType type) {
     if (delegate == null) {
-      delegate = new HttpFunctionProvider(supportedTypes);
+      delegate = new HttpFunctionProvider(supportedTypes, Collections.emptyMap());
     }
     return delegate.functionOfType(type);
   }
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
index 39bc028..58dbb92 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -29,7 +29,7 @@
 import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.RoutableMessage;
 import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator;
@@ -60,7 +60,7 @@
   private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
   private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
       new HashMap<>();
-  private final Map<FunctionType, HttpFunctionSpec> requestReplyFunctions = new HashMap<>();
+  private final Map<FunctionType, HttpFunctionEndpointSpec> requestReplyFunctions = new HashMap<>();
   private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>();
 
   @Nullable private StatefulFunctionsConfig config;
@@ -102,8 +102,8 @@
   public StatefulFunctionDataStreamBuilder withRequestReplyRemoteFunction(
       RequestReplyFunctionBuilder builder) {
     Objects.requireNonNull(builder);
-    HttpFunctionSpec spec = builder.spec();
-    putAndThrowIfPresent(requestReplyFunctions, spec.functionType(), spec);
+    HttpFunctionEndpointSpec spec = builder.spec();
+    putAndThrowIfPresent(requestReplyFunctions, spec.target().asSpecificFunctionType(), spec);
     return this;
   }
 
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
index d4faf27..ea3cded 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.statefun.sdk;
 
+import java.io.Serializable;
 import java.util.Objects;
 
-public final class FunctionTypeNamespaceMatcher {
+public final class FunctionTypeNamespaceMatcher implements Serializable {
+
+  private static final long serialVersionUID = 1;
 
   private final String targetNamespace;