[FLINK-20335] [core] Remove support for module YAML versions 1.0 / 2.0
This closes #184.
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index 76ffde5..aa825b3 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -79,7 +79,6 @@
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET, URI.create("http://localhost:5000/statefun"))
- .withPersistedState("seen_count")
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500))
.withEgressId(GREETINGS)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
index d5731d1..85df004 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
@@ -17,14 +17,14 @@
*/
package org.apache.flink.statefun.flink.core.httpfn;
+import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
-import org.apache.flink.types.Either;
-public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec {
+public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec, Serializable {
+
+ private static final long serialVersionUID = 1;
private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
@@ -32,7 +32,7 @@
private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
- private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+ private final Target target;
private final UrlPathTemplate urlPathTemplate;
private final Duration maxRequestDuration;
@@ -41,13 +41,12 @@
private final Duration writeTimeout;
private final int maxNumBatchRequests;
- public static Builder builder(
- Either<FunctionType, FunctionTypeNamespaceMatcher> target, UrlPathTemplate urlPathTemplate) {
+ public static Builder builder(Target target, UrlPathTemplate urlPathTemplate) {
return new Builder(target, urlPathTemplate);
}
private HttpFunctionEndpointSpec(
- Either<FunctionType, FunctionTypeNamespaceMatcher> target,
+ Target target,
UrlPathTemplate urlPathTemplate,
Duration maxRequestDuration,
Duration connectTimeout,
@@ -64,7 +63,7 @@
}
@Override
- public Either<FunctionType, FunctionTypeNamespaceMatcher> target() {
+ public Target target() {
return target;
}
@@ -100,7 +99,7 @@
public static final class Builder {
- private final Either<FunctionType, FunctionTypeNamespaceMatcher> target;
+ private final Target target;
private final UrlPathTemplate urlPathTemplate;
private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
@@ -109,9 +108,7 @@
private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
- private Builder(
- Either<FunctionType, FunctionTypeNamespaceMatcher> target,
- UrlPathTemplate urlPathTemplate) {
+ private Builder(Target target, UrlPathTemplate urlPathTemplate) {
this.target = Objects.requireNonNull(target);
this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 67c2c68..5445500 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -15,53 +15,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.statefun.flink.core.httpfn;
import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
+import java.net.URI;
import java.util.Map;
+import java.util.Objects;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.apache.flink.statefun.flink.core.common.ManagingResources;
-import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
@NotThreadSafe
-public class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
- private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
+public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
+
+ private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
+ private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
/** lazily initialized by {code buildHttpClient} */
@Nullable private OkHttpClient sharedClient;
private volatile boolean shutdown;
- public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
- this.supportedTypes = supportedTypes;
+ public HttpFunctionProvider(
+ Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
+ Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
+ this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
+ this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
}
@Override
- public RequestReplyFunction functionOfType(FunctionType type) {
- HttpFunctionSpec spec = supportedTypes.get(type);
- if (spec == null) {
- throw new IllegalArgumentException("Unsupported type " + type);
- }
+ public StatefulFunction functionOfType(FunctionType functionType) {
+ final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
return new RequestReplyFunction(
- new PersistedRemoteFunctionValues(spec.states()),
- spec.maxNumBatchRequests(),
- buildHttpClient(spec));
+ endpointsSpec.maxNumBatchRequests(), buildHttpClient(endpointsSpec, functionType));
}
- public HttpFunctionSpec getFunctionSpec(FunctionType type) {
- return supportedTypes.get(type);
+ private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
+ HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType);
+ if (endpointSpec != null) {
+ return endpointSpec;
+ }
+ endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace());
+ if (endpointSpec != null) {
+ return endpointSpec;
+ }
+
+ throw new IllegalStateException("Unknown type: " + functionType);
}
- private RequestReplyClient buildHttpClient(HttpFunctionSpec spec) {
+ private RequestReplyClient buildHttpClient(
+ HttpFunctionEndpointSpec spec, FunctionType functionType) {
if (sharedClient == null) {
sharedClient = OkHttpUtils.newClient();
}
@@ -71,9 +82,11 @@
clientBuilder.readTimeout(spec.readTimeout());
clientBuilder.writeTimeout(spec.writeTimeout());
+ URI endpointUrl = spec.urlPathTemplate().apply(functionType);
+
final HttpUrl url;
- if (UnixDomainHttpEndpoint.validate(spec.endpoint())) {
- UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(spec.endpoint());
+ if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
+ UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
url =
new HttpUrl.Builder()
@@ -84,7 +97,7 @@
configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
} else {
- url = HttpUrl.get(spec.endpoint());
+ url = HttpUrl.get(endpointUrl);
}
return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
deleted file mode 100644
index 42a7abb..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-
-public final class HttpFunctionSpec implements FunctionSpec, Serializable {
-
- private static final long serialVersionUID = 1;
-
- private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
- private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
- private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
- private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
- private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-
- private final FunctionType functionType;
- private final URI endpoint;
- private final List<StateSpec> states;
- private final Duration maxRequestDuration;
- private final Duration connectTimeout;
- private final Duration readTimeout;
- private final Duration writeTimeout;
- private final int maxNumBatchRequests;
-
- private HttpFunctionSpec(
- FunctionType functionType,
- URI endpoint,
- List<StateSpec> states,
- Duration maxRequestDuration,
- Duration connectTimeout,
- Duration readTimeout,
- Duration writeTimeout,
- int maxNumBatchRequests) {
- this.functionType = Objects.requireNonNull(functionType);
- this.endpoint = Objects.requireNonNull(endpoint);
- this.states = Objects.requireNonNull(states);
- this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
- this.connectTimeout = Objects.requireNonNull(connectTimeout);
- this.readTimeout = Objects.requireNonNull(readTimeout);
- this.writeTimeout = Objects.requireNonNull(writeTimeout);
- this.maxNumBatchRequests = maxNumBatchRequests;
- }
-
- public static Builder builder(FunctionType functionType, URI endpoint) {
- return new Builder(functionType, endpoint);
- }
-
- @Override
- public FunctionType functionType() {
- return functionType;
- }
-
- @Override
- public Kind kind() {
- return Kind.HTTP;
- }
-
- public URI endpoint() {
- return endpoint;
- }
-
- public List<StateSpec> states() {
- return states;
- }
-
- public Duration maxRequestDuration() {
- return maxRequestDuration;
- }
-
- public Duration connectTimeout() {
- return connectTimeout;
- }
-
- public Duration readTimeout() {
- return readTimeout;
- }
-
- public Duration writeTimeout() {
- return writeTimeout;
- }
-
- public int maxNumBatchRequests() {
- return maxNumBatchRequests;
- }
-
- public static final class Builder {
-
- private final FunctionType functionType;
- private final URI endpoint;
-
- private final List<StateSpec> states = new ArrayList<>();
- private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
- private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
- private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
- private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
- private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
-
- private Builder(FunctionType functionType, URI endpoint) {
- this.functionType = Objects.requireNonNull(functionType);
- this.endpoint = Objects.requireNonNull(endpoint);
- }
-
- public Builder withState(StateSpec stateSpec) {
- this.states.add(stateSpec);
- return this;
- }
-
- public Builder withMaxRequestDuration(Duration duration) {
- this.maxRequestDuration = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withConnectTimeoutDuration(Duration duration) {
- this.connectTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withReadTimeoutDuration(Duration duration) {
- this.readTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withWriteTimeoutDuration(Duration duration) {
- this.writeTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
- this.maxNumBatchRequests = maxNumBatchRequests;
- return this;
- }
-
- public HttpFunctionSpec build() {
- validateTimeouts();
-
- return new HttpFunctionSpec(
- functionType,
- endpoint,
- states,
- maxRequestDuration,
- connectTimeout,
- readTimeout,
- writeTimeout,
- maxNumBatchRequests);
- }
-
- private Duration requireNonZeroDuration(Duration duration) {
- Objects.requireNonNull(duration);
- if (duration.equals(Duration.ZERO)) {
- throw new IllegalArgumentException("Timeout durations must be larger than 0.");
- }
-
- return duration;
- }
-
- private void validateTimeouts() {
- if (connectTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException(
- "Connect timeout cannot be larger than request timeout.");
- }
-
- if (readTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
- }
-
- if (writeTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
- }
- }
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
deleted file mode 100644
index 8bb3c84..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/StateSpec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.flink.statefun.sdk.state.Expiration;
-
-public final class StateSpec implements Serializable {
-
- private static final long serialVersionUID = 1;
-
- private final String name;
- private final Expiration ttlExpiration;
-
- public StateSpec(String name) {
- this(name, Expiration.none());
- }
-
- public StateSpec(String name, Expiration ttlExpiration) {
- this.name = Objects.requireNonNull(name);
- this.ttlExpiration = Objects.requireNonNull(ttlExpiration);
- }
-
- public String name() {
- return name;
- }
-
- public Expiration ttlExpiration() {
- return ttlExpiration;
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
deleted file mode 100644
index ae4d7f4..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TemplatedHttpFunctionProvider.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.httpfn;
-
-import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
-
-import java.net.URI;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import org.apache.flink.statefun.flink.core.common.ManagingResources;
-import org.apache.flink.statefun.flink.core.reqreply.PersistedRemoteFunctionValues;
-import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
-import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-
-@NotThreadSafe
-public final class TemplatedHttpFunctionProvider
- implements StatefulFunctionProvider, ManagingResources {
-
- private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
- private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
-
- /** lazily initialized by {code buildHttpClient} */
- @Nullable private OkHttpClient sharedClient;
-
- private volatile boolean shutdown;
-
- public TemplatedHttpFunctionProvider(
- Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
- Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
- this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
- this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
- }
-
- @Override
- public StatefulFunction functionOfType(FunctionType functionType) {
- final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
- return new RequestReplyFunction(
- new PersistedRemoteFunctionValues(Collections.emptyList()),
- endpointsSpec.maxNumBatchRequests(),
- buildHttpClient(endpointsSpec, functionType));
- }
-
- private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
- HttpFunctionEndpointSpec endpointSpec = specificTypeEndpointSpecs.get(functionType);
- if (endpointSpec != null) {
- return endpointSpec;
- }
- endpointSpec = perNamespaceEndpointSpecs.get(functionType.namespace());
- if (endpointSpec != null) {
- return endpointSpec;
- }
-
- throw new IllegalStateException("Unknown type: " + functionType);
- }
-
- private RequestReplyClient buildHttpClient(
- HttpFunctionEndpointSpec spec, FunctionType functionType) {
- if (sharedClient == null) {
- sharedClient = OkHttpUtils.newClient();
- }
- OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
- clientBuilder.callTimeout(spec.maxRequestDuration());
- clientBuilder.connectTimeout(spec.connectTimeout());
- clientBuilder.readTimeout(spec.readTimeout());
- clientBuilder.writeTimeout(spec.writeTimeout());
-
- URI endpointUrl = spec.urlPathTemplate().apply(functionType);
-
- final HttpUrl url;
- if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
- UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
- url =
- new HttpUrl.Builder()
- .scheme("http")
- .host("unused")
- .addPathSegment(endpoint.pathSegment)
- .build();
-
- configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
- } else {
- url = HttpUrl.get(endpointUrl);
- }
- return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
- }
-
- @Override
- public void shutdown() {
- shutdown = true;
- OkHttpUtils.closeSilently(sharedClient);
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index 87f4ec4..580e6c7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -19,8 +19,17 @@
package org.apache.flink.statefun.flink.core.jsonmodule;
enum FormatVersion {
+ // ============================================================
+ // EOL versions
+ // ============================================================
+
v1_0("1.0"),
v2_0("2.0"),
+
+ // ============================================================
+ // Supported versions
+ // ============================================================
+
v3_0("3.0");
private String versionStr;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
index fac1bbe..e88f439 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -32,12 +32,11 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
-import org.apache.flink.statefun.flink.core.httpfn.TemplatedHttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.apache.flink.types.Either;
import org.apache.flink.util.TimeUtils;
public final class FunctionEndpointJsonEntity implements JsonEntity {
@@ -82,17 +81,18 @@
for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
- final Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs = new HashMap<>();
+ final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
+ new HashMap<>();
entry
.getValue()
.forEach(
spec -> {
- Either<FunctionType, FunctionTypeNamespaceMatcher> target = spec.target();
- if (target.isLeft()) {
- specificTypeEndpointSpecs.put(target.left(), spec);
+ FunctionEndpointSpec.Target target = spec.target();
+ if (target.isSpecificFunctionType()) {
+ specificTypeEndpointSpecs.put(target.asSpecificFunctionType(), spec);
} else {
- perNamespaceEndpointSpecs.put(target.right().targetNamespace(), spec);
+ perNamespaceEndpointSpecs.put(target.asNamespace(), spec);
}
});
@@ -103,10 +103,7 @@
.forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
perNamespaceEndpointSpecs
.keySet()
- .forEach(
- namespace ->
- binder.bindFunctionProvider(
- FunctionTypeNamespaceMatcher.targetNamespace(namespace), provider));
+ .forEach(namespace -> binder.bindFunctionProvider(namespace, provider));
}
}
@@ -157,15 +154,14 @@
return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
}
- private static Either<FunctionType, FunctionTypeNamespaceMatcher> target(
- JsonNode functionEndpointSpecNode) {
+ private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) {
JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME);
String namespace = Selectors.textAt(targetNode, TypenamePointers.NAMESPACE);
Optional<String> functionName =
Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME);
return (functionName.isPresent())
- ? Either.Left(new FunctionType(namespace, functionName.get()))
- : Either.Right(FunctionTypeNamespaceMatcher.targetNamespace(namespace));
+ ? FunctionEndpointSpec.Target.functionType(new FunctionType(namespace, functionName.get()))
+ : FunctionEndpointSpec.Target.namespace(namespace);
}
private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
@@ -186,11 +182,12 @@
private static StatefulFunctionProvider functionProvider(
FunctionEndpointSpec.Kind kind,
Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
- Map<String, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+ Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
switch (kind) {
case HTTP:
- return new TemplatedHttpFunctionProvider(
- castValues(specificTypeEndpointSpecs), castValues(perNamespaceEndpointSpecs));
+ return new HttpFunctionProvider(
+ castValues(specificTypeEndpointSpecs),
+ castValues(namespaceAsKey(perNamespaceEndpointSpecs)));
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
@@ -203,4 +200,13 @@
Map<K, FunctionEndpointSpec> toCast) {
return new HashMap(toCast);
}
+
+ private static Map<String, FunctionEndpointSpec> namespaceAsKey(
+ Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+ final Map<String, FunctionEndpointSpec> converted =
+ new HashMap<>(perNamespaceEndpointSpecs.size());
+ perNamespaceEndpointSpecs.forEach(
+ (namespaceMatcher, spec) -> converted.put(namespaceMatcher.targetNamespace(), spec));
+ return converted;
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
index 940db87..9af6345 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointSpec.java
@@ -17,15 +17,15 @@
*/
package org.apache.flink.statefun.flink.core.jsonmodule;
+import java.io.Serializable;
import java.net.URI;
import java.util.Objects;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
-import org.apache.flink.types.Either;
public interface FunctionEndpointSpec {
- Either<FunctionType, FunctionTypeNamespaceMatcher> target();
+ Target target();
Kind kind();
@@ -36,7 +36,72 @@
GRPC
}
- class UrlPathTemplate {
+ abstract class Target implements Serializable {
+
+ public static Target namespace(String namespace) {
+ return new NamespaceTarget(FunctionTypeNamespaceMatcher.targetNamespace(namespace));
+ }
+
+ public static Target functionType(FunctionType functionType) {
+ return new FunctionTypeTarget(functionType);
+ }
+
+ public boolean isSpecificFunctionType() {
+ return this.getClass() == FunctionTypeTarget.class;
+ }
+
+ public boolean isNamespace() {
+ return this.getClass() == NamespaceTarget.class;
+ }
+
+ public abstract FunctionTypeNamespaceMatcher asNamespace();
+
+ public abstract FunctionType asSpecificFunctionType();
+
+ private static class NamespaceTarget extends Target {
+ private static final long serialVersionUID = 1;
+
+ private final FunctionTypeNamespaceMatcher namespaceMatcher;
+
+ private NamespaceTarget(FunctionTypeNamespaceMatcher namespaceMatcher) {
+ this.namespaceMatcher = Objects.requireNonNull(namespaceMatcher);
+ }
+
+ @Override
+ public FunctionTypeNamespaceMatcher asNamespace() {
+ return namespaceMatcher;
+ }
+
+ @Override
+ public FunctionType asSpecificFunctionType() {
+ throw new IllegalStateException("This target is not a specific function type");
+ }
+ }
+
+ private static class FunctionTypeTarget extends Target {
+ private static final long serialVersionUID = 1;
+
+ private final FunctionType functionType;
+
+ private FunctionTypeTarget(FunctionType functionType) {
+ this.functionType = Objects.requireNonNull(functionType);
+ }
+
+ @Override
+ public FunctionTypeNamespaceMatcher asNamespace() {
+ throw new IllegalStateException("This target is not a namespace.");
+ }
+
+ @Override
+ public FunctionType asSpecificFunctionType() {
+ return functionType;
+ }
+ }
+ }
+
+ class UrlPathTemplate implements Serializable {
+ private static final long serialVersionUID = 1;
+
private static final String FUNCTION_NAME_HOLDER = "{typename.function}";
private final String template;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
deleted file mode 100644
index 2b9c866..0000000
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toMap;
-import static org.apache.flink.statefun.flink.core.common.Maps.transformValues;
-import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import javax.annotation.Nullable;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
-import org.apache.flink.statefun.flink.common.json.Selectors;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
-import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
-import org.apache.flink.statefun.sdk.state.Expiration;
-import org.apache.flink.util.TimeUtils;
-
-final class FunctionJsonEntity implements JsonEntity {
-
- private static final JsonPointer FUNCTION_SPECS_POINTER = JsonPointer.compile("/functions");
-
- private static final class MetaPointers {
- private static final JsonPointer KIND = JsonPointer.compile("/function/meta/kind");
- private static final JsonPointer TYPE = JsonPointer.compile("/function/meta/type");
- }
-
- private static final class SpecPointers {
- private static final JsonPointer HOSTNAME = JsonPointer.compile("/function/spec/host");
- private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
- private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
- private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
-
- private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
- private static final JsonPointer CONNECT_TIMEOUT =
- JsonPointer.compile("/function/spec/connectTimeout");
- private static final JsonPointer READ_TIMEOUT =
- JsonPointer.compile("/function/spec/readTimeout");
- private static final JsonPointer WRITE_TIMEOUT =
- JsonPointer.compile("/function/spec/writeTimeout");
-
- private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
- JsonPointer.compile("/function/spec/maxNumBatchRequests");
- }
-
- private static final class StateSpecPointers {
- private static final JsonPointer NAME = JsonPointer.compile("/name");
- private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter");
- private static final JsonPointer EXPIRE_MODE = JsonPointer.compile("/expireMode");
- }
-
- @Override
- public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
- final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode);
-
- for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
- parse(functionSpecNodes, formatVersion).entrySet()) {
- StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue());
- Set<FunctionType> functionTypes = entry.getValue().keySet();
- for (FunctionType type : functionTypes) {
- binder.bindFunctionProvider(type, provider);
- }
- }
- }
-
- private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
- Iterable<? extends JsonNode> functionSpecNodes, FormatVersion formatVersion) {
- return StreamSupport.stream(functionSpecNodes.spliterator(), false)
- .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, formatVersion))
- .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
- }
-
- private static Iterable<? extends JsonNode> functionSpecNodes(JsonNode moduleSpecRootNode) {
- return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
- }
-
- private static FunctionSpec parseFunctionSpec(
- JsonNode functionNode, FormatVersion formatVersion) {
- String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
- FunctionSpec.Kind kind =
- FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
- FunctionType functionType = functionType(functionNode);
- switch (kind) {
- case HTTP:
- final HttpFunctionSpec.Builder specBuilder =
- HttpFunctionSpec.builder(functionType, functionUri(functionNode));
-
- final Function<JsonNode, List<StateSpec>> stateSpecParser =
- functionStateParserOf(formatVersion);
- for (StateSpec state : stateSpecParser.apply(functionNode)) {
- specBuilder.withState(state);
- }
- optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
- optionalTimeoutDuration(functionNode, SpecPointers.TIMEOUT)
- .ifPresent(specBuilder::withMaxRequestDuration);
- optionalTimeoutDuration(functionNode, SpecPointers.CONNECT_TIMEOUT)
- .ifPresent(specBuilder::withConnectTimeoutDuration);
- optionalTimeoutDuration(functionNode, SpecPointers.READ_TIMEOUT)
- .ifPresent(specBuilder::withReadTimeoutDuration);
- optionalTimeoutDuration(functionNode, SpecPointers.WRITE_TIMEOUT)
- .ifPresent(specBuilder::withWriteTimeoutDuration);
-
- return specBuilder.build();
- case GRPC:
- return new GrpcFunctionSpec(functionType, functionAddress(functionNode));
- default:
- throw new IllegalArgumentException("Unrecognized function kind " + functionKind);
- }
- }
-
- private static Function<JsonNode, List<StateSpec>> functionStateParserOf(
- FormatVersion formatVersion) {
- switch (formatVersion) {
- case v1_0:
- return FunctionJsonEntity::functionStateSpecParserV1;
- case v2_0:
- return FunctionJsonEntity::functionStateSpecParserV2;
- default:
- throw new IllegalStateException("Unrecognized format version: " + formatVersion);
- }
- }
-
- private static List<StateSpec> functionStateSpecParserV1(JsonNode functionNode) {
- final List<String> stateNames = Selectors.textListAt(functionNode, SpecPointers.STATES);
- return stateNames.stream().map(StateSpec::new).collect(Collectors.toList());
- }
-
- private static List<StateSpec> functionStateSpecParserV2(JsonNode functionNode) {
- final Iterable<? extends JsonNode> stateSpecNodes =
- Selectors.listAt(functionNode, SpecPointers.STATES);
- final List<StateSpec> stateSpecs = new ArrayList<>();
-
- stateSpecNodes.forEach(
- stateSpecNode -> {
- final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME);
- final Expiration expiration = stateTtlExpiration(stateSpecNode);
- stateSpecs.add(new StateSpec(name, expiration));
- });
- return stateSpecs;
- }
-
- private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
- return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
- }
-
- private static Optional<Duration> optionalTimeoutDuration(
- JsonNode functionNode, JsonPointer timeoutPointer) {
- return Selectors.optionalTextAt(functionNode, timeoutPointer).map(TimeUtils::parseDuration);
- }
-
- private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
- final Optional<Duration> duration =
- Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION)
- .map(TimeUtils::parseDuration);
-
- if (!duration.isPresent()) {
- return Expiration.none();
- }
-
- final Optional<String> mode =
- Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_MODE);
- if (!mode.isPresent()) {
- return Expiration.expireAfterReadingOrWriting(duration.get());
- }
-
- switch (mode.get()) {
- case "after-invoke":
- return Expiration.expireAfterReadingOrWriting(duration.get());
- case "after-write":
- return Expiration.expireAfterWriting(duration.get());
- default:
- throw new IllegalArgumentException(
- "Invalid state ttl expire mode; must be one of [after-invoke, after-write].");
- }
- }
-
- private static FunctionType functionType(JsonNode functionNode) {
- String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
- NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
- return new FunctionType(nn.namespace(), nn.name());
- }
-
- private static InetSocketAddress functionAddress(JsonNode functionNode) {
- String host = Selectors.textAt(functionNode, SpecPointers.HOSTNAME);
- int port = Selectors.integerAt(functionNode, SpecPointers.PORT);
- return new InetSocketAddress(host, port);
- }
-
- private static URI functionUri(JsonNode functionNode) {
- String uri = Selectors.textAt(functionNode, SpecPointers.ENDPOINT);
- URI typedUri = URI.create(uri);
- @Nullable String scheme = typedUri.getScheme();
- if (scheme == null) {
- throw new IllegalArgumentException(
- "Missing scheme in function endpoint "
- + uri
- + "; an http or https scheme must be provided.");
- }
- if (scheme.equalsIgnoreCase("http")
- || scheme.equalsIgnoreCase("https")
- || scheme.equalsIgnoreCase("http+unix")
- || scheme.equalsIgnoreCase("https+unix")) {
- return typedUri;
- }
- throw new IllegalArgumentException(
- "Missing scheme in function endpoint "
- + uri
- + "; an http or https or http+unix or https+unix scheme must be provided.");
- }
-
- private static Collector<FunctionSpec, ?, Map<FunctionType, FunctionSpec>> groupByFunctionType() {
- return toMap(FunctionSpec::functionType, Function.identity());
- }
-
- private static StatefulFunctionProvider functionProvider(
- Kind kind, Map<FunctionType, FunctionSpec> definedFunctions) {
- switch (kind) {
- case HTTP:
- return new HttpFunctionProvider(
- transformValues(definedFunctions, HttpFunctionSpec.class::cast));
- case GRPC:
- return new GrpcFunctionProvider(
- transformValues(definedFunctions, GrpcFunctionSpec.class::cast));
- default:
- throw new IllegalStateException("Unexpected value: " + kind);
- }
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 658bb9b..eb26c44 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -32,13 +32,6 @@
/** Entities in the JSON moduleSpecNode that should be parsed and bound to the module. */
private static final List<JsonEntity> ENTITIES =
Arrays.asList(
- new FunctionJsonEntity(),
- new IngressJsonEntity(),
- new RouterJsonEntity(),
- new EgressJsonEntity());
-
- private static final List<JsonEntity> V3_ENTITIES =
- Arrays.asList(
new FunctionEndpointJsonEntity(),
new IngressJsonEntity(),
new RouterJsonEntity(),
@@ -56,11 +49,7 @@
public void configure(Map<String, String> conf, Binder binder) {
try {
- if (formatVersion == FormatVersion.v3_0) {
- V3_ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
- } else {
- ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
- }
+ ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
} catch (Throwable t) {
throw new ModuleConfigurationException(
format("Error while parsing module at %s", moduleUrl), t);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
index a507540..d5c7e77 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonServiceLoader.java
@@ -97,8 +97,15 @@
}
private static FormatVersion requireValidFormatVersion(JsonNode root) {
- final String formatVersion = Selectors.textAt(root, FORMAT_VERSION);
- return FormatVersion.fromString(formatVersion);
+ final String formatVersionStr = Selectors.textAt(root, FORMAT_VERSION);
+ final FormatVersion formatVersion = FormatVersion.fromString(formatVersionStr);
+ if (formatVersion.compareTo(FormatVersion.v3_0) < 0) {
+ throw new IllegalArgumentException(
+ "Only format versions higher than or equal to 3.0 is supported. Was version "
+ + formatVersion
+ + ".");
+ }
+ return formatVersion;
}
@VisibleForTesting
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index 857bcb6..6553a1a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -23,8 +23,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
@@ -39,20 +37,7 @@
@Persisted private final PersistedStateRegistry stateRegistry = new PersistedStateRegistry();
- private final Map<String, PersistedValue<byte[]>> managedStates;
-
- /**
- * @deprecated {@link PersistedRemoteFunctionValues} should no longer be instantiated with eagerly
- * declared state specs. State can now be dynamically registered with {@link
- * #registerStates(List)}. This constructor will be removed once old module specification
- * formats, which supports eager state declarations, are removed.
- */
- @Deprecated
- public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) {
- Objects.requireNonNull(stateSpecs);
- this.managedStates = new HashMap<>(stateSpecs.size());
- stateSpecs.forEach(this::createAndRegisterEagerValueState);
- }
+ private final Map<String, PersistedValue<byte[]>> managedStates = new HashMap<>();
void attachStateValues(InvocationBatchRequest.Builder batchBuilder) {
for (Map.Entry<String, PersistedValue<byte[]>> managedStateEntry : managedStates.entrySet()) {
@@ -134,15 +119,6 @@
}
}
- private void createAndRegisterEagerValueState(StateSpec stateSpec) {
- final String stateName = stateSpec.name();
-
- final PersistedValue<byte[]> stateValue =
- PersistedValue.of(stateName, byte[].class, stateSpec.ttlExpiration());
- stateRegistry.registerValue(stateValue);
- managedStates.put(stateName, stateValue);
- }
-
private PersistedValue<byte[]> getStateHandleOrThrow(String stateName) {
final PersistedValue<byte[]> handle = managedStates.get(stateName);
if (handle == null) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 01ee950..b2054f2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -25,6 +25,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
@@ -70,13 +71,16 @@
@Persisted private final PersistedRemoteFunctionValues managedStates;
- public RequestReplyFunction(
- PersistedRemoteFunctionValues managedStates,
- int maxNumBatchRequests,
- RequestReplyClient client) {
- this.managedStates = Objects.requireNonNull(managedStates);
- this.client = Objects.requireNonNull(client);
+ public RequestReplyFunction(int maxNumBatchRequests, RequestReplyClient client) {
+ this(new PersistedRemoteFunctionValues(), maxNumBatchRequests, client);
+ }
+
+ @VisibleForTesting
+ RequestReplyFunction(
+ PersistedRemoteFunctionValues states, int maxNumBatchRequests, RequestReplyClient client) {
+ this.managedStates = Objects.requireNonNull(states);
this.maxNumBatchRequests = maxNumBatchRequests;
+ this.client = Objects.requireNonNull(client);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index 3001684..92014a9 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -17,14 +17,16 @@
*/
package org.apache.flink.statefun.flink.core.jsonmodule;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
@@ -35,24 +37,10 @@
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-@RunWith(Parameterized.class)
public class JsonModuleTest {
- @Parameterized.Parameters(name = "Format version = {0}, module path = \"{1}\"")
- public static Collection<?> modules() {
- return Arrays.asList(
- new Object[] {FormatVersion.v1_0, "module-v1_0/module.yaml"},
- new Object[] {FormatVersion.v2_0, "module-v2_0/module.yaml"});
- }
-
- private final String modulePath;
-
- public JsonModuleTest(FormatVersion ignored, String modulePath) {
- this.modulePath = modulePath;
- }
+ private static final String modulePath = "module-v3_0/module.yaml";
@Test
public void exampleUsage() {
@@ -71,9 +59,10 @@
assertThat(
universe.functions(),
allOf(
- hasKey(new FunctionType("com.example", "hello")),
- hasKey(new FunctionType("com.foo", "world")),
- hasKey(new FunctionType("com.bar", "world"))));
+ hasKey(new FunctionType("com.foo.bar", "specific_function")),
+ hasKey(new FunctionType("com.other.namespace", "hello"))));
+
+ assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar"));
}
@Test
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
deleted file mode 100644
index fcbb885..0000000
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleV3Test.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.flink.core.jsonmodule;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.Message;
-import java.net.URL;
-import java.util.Collections;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
-import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
-import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-import org.junit.Test;
-
-public class JsonModuleV3Test {
-
- private static final String modulePath = "module-v3_0/module.yaml";
-
- @Test
- public void exampleUsage() {
- StatefulFunctionModule module = fromPath(modulePath);
-
- assertThat(module, notNullValue());
- }
-
- @Test
- public void testFunctions() {
- StatefulFunctionModule module = fromPath(modulePath);
-
- StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
-
- assertThat(
- universe.functions(),
- allOf(
- hasKey(new FunctionType("com.foo.bar", "specific_function")),
- hasKey(new FunctionType("com.other.namespace", "hello"))));
-
- assertThat(universe.namespaceFunctions(), hasKey("com.foo.bar"));
- }
-
- @Test
- public void testRouters() {
- StatefulFunctionModule module = fromPath(modulePath);
-
- StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
-
- assertThat(
- universe.routers(),
- hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names")));
- }
-
- @Test
- public void testIngresses() {
- StatefulFunctionModule module = fromPath(modulePath);
-
- StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
-
- assertThat(
- universe.ingress(),
- hasKey(new IngressIdentifier<>(Message.class, "com.mycomp.igal", "names")));
- }
-
- @Test
- public void testEgresses() {
- StatefulFunctionModule module = fromPath(modulePath);
-
- StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
-
- assertThat(
- universe.egress(), hasKey(new EgressIdentifier<>("com.mycomp.foo", "bar", Any.class)));
- }
-
- private static StatefulFunctionModule fromPath(String path) {
- URL moduleUrl = JsonModuleTest.class.getClassLoader().getResource(path);
- assertThat(moduleUrl, not(nullValue()));
- ObjectMapper mapper = JsonServiceLoader.mapper();
- return JsonServiceLoader.fromUrl(mapper, moduleUrl);
- }
-
- private static StatefulFunctionsUniverse emptyUniverse() {
- return new StatefulFunctionsUniverse(
- MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
- }
-}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
index f633fc2..48dadc1 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
@@ -35,8 +35,7 @@
@Test
public void exampleUsage() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
// --- register persisted states
values.registerStates(
@@ -63,8 +62,7 @@
@Test
public void zeroRegisteredStates() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
values.attachStateValues(builder);
@@ -74,8 +72,7 @@
@Test(expected = IllegalStateException.class)
public void updatingNonRegisteredStateShouldThrow() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
values.updateStateValues(
Collections.singletonList(
@@ -85,8 +82,7 @@
@Test
public void registeredStateWithEmptyValueShouldBeAttached() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
@@ -99,8 +95,7 @@
@Test
public void registeredStateWithDeletedValueShouldBeAttached() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
@@ -120,8 +115,7 @@
@Test
public void duplicateRegistrationsHasNoEffect() {
- final PersistedRemoteFunctionValues values =
- new PersistedRemoteFunctionValues(Collections.emptyList());
+ final PersistedRemoteFunctionValues values = new PersistedRemoteFunctionValues();
values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
values.updateStateValues(
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index c4eb85a..d545281 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -40,7 +40,6 @@
import java.util.stream.Collectors;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
@@ -66,11 +65,9 @@
private final FakeClient client = new FakeClient();
private final FakeContext context = new FakeContext();
- private final PersistedRemoteFunctionValues states =
- new PersistedRemoteFunctionValues(Collections.singletonList(new StateSpec("session")));
private final RequestReplyFunction functionUnderTest =
- new RequestReplyFunction(states, 10, client);
+ new RequestReplyFunction(testInitialRegisteredState("session"), 10, client);
@Test
public void example() {
@@ -116,7 +113,7 @@
@Test
public void reachingABatchLimitTriggersBackpressure() {
- RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client);
+ RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
// send one message
functionUnderTest.invoke(context, Any.getDefaultInstance());
@@ -132,7 +129,7 @@
@Test
public void returnedMessageReleaseBackpressure() {
- RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 2, client);
+ RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
// the following invocations should cause backpressure
functionUnderTest.invoke(context, Any.getDefaultInstance());
@@ -273,6 +270,15 @@
assertThat(context.functionTypeMetrics().numBacklog, is(0));
}
+ private static PersistedRemoteFunctionValues testInitialRegisteredState(
+ String existingStateName) {
+ final PersistedRemoteFunctionValues states = new PersistedRemoteFunctionValues();
+ states.registerStates(
+ Collections.singletonList(
+ PersistedValueSpec.newBuilder().setStateName(existingStateName).build()));
+ return states;
+ }
+
private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
return new AsyncOperationResult<>(
new Object(), Status.SUCCESS, FromFunction.getDefaultInstance(), null);
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 58c382c..d24636b 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -21,10 +21,10 @@
import java.net.URI;
import java.time.Duration;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
-import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
+import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.Target;
+import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.UrlPathTemplate;
import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.state.Expiration;
/** A Builder for RequestReply remote function type. */
public class RequestReplyFunctionBuilder {
@@ -41,33 +41,12 @@
return new RequestReplyFunctionBuilder(functionType, endpoint);
}
- private final HttpFunctionSpec.Builder builder;
+ private final HttpFunctionEndpointSpec.Builder builder;
private RequestReplyFunctionBuilder(FunctionType functionType, URI endpoint) {
- this.builder = HttpFunctionSpec.builder(functionType, endpoint);
- }
-
- /**
- * Declares a remote function state.
- *
- * @param name the name of the state to be used remotely.
- * @return this builder.
- */
- public RequestReplyFunctionBuilder withPersistedState(String name) {
- builder.withState(new StateSpec(name, Expiration.none()));
- return this;
- }
-
- /**
- * Declares a remote function state, with expiration.
- *
- * @param name the name of the state to be used remotely.
- * @param ttlExpiration the expiration mode for which this state might be deleted.
- * @return this builder.
- */
- public RequestReplyFunctionBuilder withExpiringState(String name, Expiration ttlExpiration) {
- builder.withState(new StateSpec(name, ttlExpiration));
- return this;
+ this.builder =
+ HttpFunctionEndpointSpec.builder(
+ Target.functionType(functionType), new UrlPathTemplate(endpoint.toASCIIString()));
}
/**
@@ -127,7 +106,7 @@
}
@Internal
- HttpFunctionSpec spec() {
+ HttpFunctionEndpointSpec spec() {
return builder.build();
}
}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
index 28ac564..eed278c 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
@@ -18,13 +18,14 @@
package org.apache.flink.statefun.flink.datastream;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
@@ -34,17 +35,17 @@
private static final long serialVersionUID = 1;
- private final Map<FunctionType, HttpFunctionSpec> supportedTypes;
+ private final Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes;
private transient @Nullable HttpFunctionProvider delegate;
- SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
+ SerializableHttpFunctionProvider(Map<FunctionType, HttpFunctionEndpointSpec> supportedTypes) {
this.supportedTypes = Objects.requireNonNull(supportedTypes);
}
@Override
public StatefulFunction functionOfType(FunctionType type) {
if (delegate == null) {
- delegate = new HttpFunctionProvider(supportedTypes);
+ delegate = new HttpFunctionProvider(supportedTypes, Collections.emptyMap());
}
return delegate.functionOfType(type);
}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
index 39bc028..58dbb92 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -29,7 +29,7 @@
import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
-import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator;
@@ -60,7 +60,7 @@
private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
new HashMap<>();
- private final Map<FunctionType, HttpFunctionSpec> requestReplyFunctions = new HashMap<>();
+ private final Map<FunctionType, HttpFunctionEndpointSpec> requestReplyFunctions = new HashMap<>();
private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>();
@Nullable private StatefulFunctionsConfig config;
@@ -102,8 +102,8 @@
public StatefulFunctionDataStreamBuilder withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder builder) {
Objects.requireNonNull(builder);
- HttpFunctionSpec spec = builder.spec();
- putAndThrowIfPresent(requestReplyFunctions, spec.functionType(), spec);
+ HttpFunctionEndpointSpec spec = builder.spec();
+ putAndThrowIfPresent(requestReplyFunctions, spec.target().asSpecificFunctionType(), spec);
return this;
}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
index d4faf27..ea3cded 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionTypeNamespaceMatcher.java
@@ -18,9 +18,12 @@
package org.apache.flink.statefun.sdk;
+import java.io.Serializable;
import java.util.Objects;
-public final class FunctionTypeNamespaceMatcher {
+public final class FunctionTypeNamespaceMatcher implements Serializable {
+
+ private static final long serialVersionUID = 1;
private final String targetNamespace;