blob: 74500cdc1933ee9b7fd5eadbe6f61ac622b94066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.jsonmodule;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;
public final class FunctionEndpointJsonEntity implements JsonEntity {
private static final JsonPointer FUNCTION_ENDPOINTS_POINTER = JsonPointer.compile("/endpoints");
private static final class MetaPointers {
private static final JsonPointer KIND = JsonPointer.compile("/endpoint/meta/kind");
}
private static final class SpecPointers {
private static final JsonPointer TARGET_FUNCTIONS =
JsonPointer.compile("/endpoint/spec/functions");
private static final JsonPointer URL_PATH_TEMPLATE =
JsonPointer.compile("/endpoint/spec/urlPathTemplate");
private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
}
private static final class TimeoutPointers {
private static final JsonPointer CALL = JsonPointer.compile("/call");
private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
private static final JsonPointer READ = JsonPointer.compile("/read");
private static final JsonPointer WRITE = JsonPointer.compile("/write");
}
@Override
public void bind(
StatefulFunctionModule.Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion) {
if (formatVersion != FormatVersion.v3_0) {
throw new IllegalArgumentException("endpoints is only supported with format version 3.0.");
}
final Iterable<? extends JsonNode> functionEndpointsSpecNodes =
functionEndpointSpecNodes(moduleSpecNode);
for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
new HashMap<>();
entry
.getValue()
.forEach(
spec -> {
FunctionEndpointSpec.Target target = spec.target();
if (target.isSpecificFunctionType()) {
specificTypeEndpointSpecs.put(target.asSpecificFunctionType(), spec);
} else {
perNamespaceEndpointSpecs.put(target.asNamespace(), spec);
}
});
StatefulFunctionProvider provider =
functionProvider(entry.getKey(), specificTypeEndpointSpecs, perNamespaceEndpointSpecs);
specificTypeEndpointSpecs
.keySet()
.forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
perNamespaceEndpointSpecs
.keySet()
.forEach(namespace -> binder.bindFunctionProvider(namespace, provider));
}
}
private static Iterable<? extends JsonNode> functionEndpointSpecNodes(
JsonNode moduleSpecRootNode) {
return Selectors.listAt(moduleSpecRootNode, FUNCTION_ENDPOINTS_POINTER);
}
private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>>
parseFunctionEndpointSpecs(Iterable<? extends JsonNode> functionEndpointsSpecNodes) {
return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), false)
.map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec)
.collect(groupingBy(FunctionEndpointSpec::kind, toList()));
}
private static FunctionEndpointSpec parseFunctionEndpointsSpec(
JsonNode functionEndpointSpecNode) {
FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode);
switch (kind) {
case HTTP:
final HttpFunctionEndpointSpec.Builder specBuilder =
HttpFunctionEndpointSpec.builder(
target(functionEndpointSpecNode), urlPathTemplate(functionEndpointSpecNode));
JsonNode timeoutsNode = functionEndpointSpecNode.at(SpecPointers.TIMEOUTS);
optionalMaxNumBatchRequests(functionEndpointSpecNode)
.ifPresent(specBuilder::withMaxNumBatchRequests);
optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CALL)
.ifPresent(specBuilder::withMaxRequestDuration);
optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CONNECT)
.ifPresent(specBuilder::withConnectTimeoutDuration);
optionalTimeoutDuration(timeoutsNode, TimeoutPointers.READ)
.ifPresent(specBuilder::withReadTimeoutDuration);
optionalTimeoutDuration(timeoutsNode, TimeoutPointers.WRITE)
.ifPresent(specBuilder::withWriteTimeoutDuration);
return specBuilder.build();
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
throw new IllegalArgumentException("Unrecognized function endpoint kind " + kind);
}
}
private static FunctionEndpointSpec.Kind endpointKind(JsonNode functionEndpointSpecNode) {
String endpointKind = Selectors.textAt(functionEndpointSpecNode, MetaPointers.KIND);
return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
}
private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) {
String targetTypeNameStr =
Selectors.textAt(functionEndpointSpecNode, SpecPointers.TARGET_FUNCTIONS);
TypeName targetTypeName = TypeName.parseFrom(targetTypeNameStr);
if (targetTypeName.namespace().contains("*")) {
throw new IllegalArgumentException(
"Invalid syntax for "
+ SpecPointers.TARGET_FUNCTIONS
+ ". Only <namespace>/<name> or <namespace>/* are supported.");
}
if (targetTypeName.name().equals("*")) {
return FunctionEndpointSpec.Target.namespace(targetTypeName.namespace());
}
if (targetTypeName.name().contains("*")) {
throw new IllegalArgumentException(
"Invalid syntax for "
+ SpecPointers.TARGET_FUNCTIONS
+ ". Only <namespace>/<name> or <namespace>/* are supported.");
}
FunctionType functionType = new FunctionType(targetTypeName.namespace(), targetTypeName.name());
return FunctionEndpointSpec.Target.functionType(functionType);
}
private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
JsonNode functionEndpointSpecNode) {
String template = Selectors.textAt(functionEndpointSpecNode, SpecPointers.URL_PATH_TEMPLATE);
return new FunctionEndpointSpec.UrlPathTemplate(template);
}
private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
}
private static Optional<Duration> optionalTimeoutDuration(
JsonNode node, JsonPointer timeoutPointer) {
return Selectors.optionalTextAt(node, timeoutPointer).map(TimeUtils::parseDuration);
}
private static StatefulFunctionProvider functionProvider(
FunctionEndpointSpec.Kind kind,
Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
switch (kind) {
case HTTP:
return new HttpFunctionProvider(
castValues(specificTypeEndpointSpecs),
castValues(namespaceAsKey(perNamespaceEndpointSpecs)));
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
throw new IllegalStateException("Unexpected kind: " + kind);
}
}
@SuppressWarnings("unchecked")
private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues(
Map<K, FunctionEndpointSpec> toCast) {
return (Map<K, NV>) new HashMap<>(toCast);
}
private static Map<String, FunctionEndpointSpec> namespaceAsKey(
Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
final Map<String, FunctionEndpointSpec> converted =
new HashMap<>(perNamespaceEndpointSpecs.size());
perNamespaceEndpointSpecs.forEach(
(namespaceMatcher, spec) -> converted.put(namespaceMatcher.targetNamespace(), spec));
return converted;
}
}