[FLINK-21960] Rename typename.namespace -> targetFunctions:
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
index 47f1764..3ef6ea6 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
@@ -24,8 +24,7 @@
meta:
kind: http
spec:
- typename:
- namespace: org.apache.flink.statefun.e2e.remote
+ functions: org.apache.flink.statefun.e2e.remote/*
urlPathTemplate: http://remote-function:8000/service
maxNumBatchRequests: 10000
ingresses:
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
index e88f439..74500cd 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -36,6 +36,7 @@
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;
@@ -48,7 +49,8 @@
}
private static final class SpecPointers {
- private static final JsonPointer TYPENAME = JsonPointer.compile("/endpoint/spec/typename");
+ private static final JsonPointer TARGET_FUNCTIONS =
+ JsonPointer.compile("/endpoint/spec/functions");
private static final JsonPointer URL_PATH_TEMPLATE =
JsonPointer.compile("/endpoint/spec/urlPathTemplate");
private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
@@ -56,11 +58,6 @@
JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
}
- private static final class TypenamePointers {
- private static final JsonPointer NAMESPACE = JsonPointer.compile("/namespace");
- private static final JsonPointer FUNCTION_NAME = JsonPointer.compile("/type");
- }
-
private static final class TimeoutPointers {
private static final JsonPointer CALL = JsonPointer.compile("/call");
private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
@@ -155,13 +152,26 @@
}
private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) {
- JsonNode targetNode = functionEndpointSpecNode.at(SpecPointers.TYPENAME);
- String namespace = Selectors.textAt(targetNode, TypenamePointers.NAMESPACE);
- Optional<String> functionName =
- Selectors.optionalTextAt(targetNode, TypenamePointers.FUNCTION_NAME);
- return (functionName.isPresent())
- ? FunctionEndpointSpec.Target.functionType(new FunctionType(namespace, functionName.get()))
- : FunctionEndpointSpec.Target.namespace(namespace);
+ String targetTypeNameStr =
+ Selectors.textAt(functionEndpointSpecNode, SpecPointers.TARGET_FUNCTIONS);
+ TypeName targetTypeName = TypeName.parseFrom(targetTypeNameStr);
+ if (targetTypeName.namespace().contains("*")) {
+ throw new IllegalArgumentException(
+ "Invalid syntax for "
+ + SpecPointers.TARGET_FUNCTIONS
+ + ". Only <namespace>/<name> or <namespace>/* are supported.");
+ }
+ if (targetTypeName.name().equals("*")) {
+ return FunctionEndpointSpec.Target.namespace(targetTypeName.namespace());
+ }
+ if (targetTypeName.name().contains("*")) {
+ throw new IllegalArgumentException(
+ "Invalid syntax for "
+ + SpecPointers.TARGET_FUNCTIONS
+ + ". Only <namespace>/<name> or <namespace>/* are supported.");
+ }
+ FunctionType functionType = new FunctionType(targetTypeName.namespace(), targetTypeName.name());
+ return FunctionEndpointSpec.Target.functionType(functionType);
}
private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
@@ -198,7 +208,7 @@
@SuppressWarnings("unchecked")
private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues(
Map<K, FunctionEndpointSpec> toCast) {
- return new HashMap(toCast);
+ return (Map<K, NV>) new HashMap<>(toCast);
}
private static Map<String, FunctionEndpointSpec> namespaceAsKey(
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml
index 6e645a3..c44cecd 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_0/module.yaml
@@ -24,8 +24,7 @@
meta:
kind: http
spec:
- typename:
- namespace: com.foo.bar
+ functions: com.foo.bar/*
urlPathTemplate: http://bar.foo.com:8080/functions/{typename.function}
timeouts:
call: 1minutes
@@ -37,17 +36,13 @@
meta:
kind: http
spec:
- typename:
- namespace: com.foo.bar
- type: specific_function
+ functions: com.foo.bar/specific_function
urlPathTemplate: http://bar.foo.com:8080/functions/abc
- endpoint:
meta:
kind: http
spec:
- typename:
- namespace: com.other.namespace
- type: hello
+ functions: com.other.namespace/hello
urlPathTemplate: http://namespace.other.com:8080/hello
routers:
- router: