blob: 337aa4e38700a421af413787992f0a173994c8c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.jsonmodule;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.util.TimeUtils;
public final class FunctionEndpointJsonEntity implements JsonEntity {
private static final JsonPointer FUNCTION_ENDPOINTS_POINTER = JsonPointer.compile("/endpoints");
private static final class MetaPointers {
private static final JsonPointer KIND = JsonPointer.compile("/endpoint/meta/kind");
}
private static final class SpecPointers {
private static final JsonPointer TARGET_FUNCTIONS =
JsonPointer.compile("/endpoint/spec/functions");
private static final JsonPointer URL_PATH_TEMPLATE =
JsonPointer.compile("/endpoint/spec/urlPathTemplate");
private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
private static final JsonPointer TRANSPORT = JsonPointer.compile("/endpoint/spec/transport");
@Deprecated
private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
}
private static final class TransportPointers {
private static final JsonPointer CLIENT_FACTORY_TYPE = JsonPointer.compile("/type");
}
@Override
public void bind(
StatefulFunctionModule.Binder binder,
ExtensionResolver extensionResolver,
JsonNode moduleSpecNode,
FormatVersion formatVersion) {
if (formatVersion.compareTo(FormatVersion.v3_0) < 0) {
throw new IllegalArgumentException("endpoints is only supported with format version 3.0.");
}
final Iterable<? extends JsonNode> functionEndpointsSpecNodes =
functionEndpointSpecNodes(moduleSpecNode);
for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
parseFunctionEndpointSpecs(functionEndpointsSpecNodes, formatVersion).entrySet()) {
final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
new HashMap<>();
entry
.getValue()
.forEach(
spec -> {
FunctionEndpointSpec.Target target = spec.target();
if (target.isSpecificFunctionType()) {
specificTypeEndpointSpecs.put(target.asSpecificFunctionType(), spec);
} else {
perNamespaceEndpointSpecs.put(target.asNamespace(), spec);
}
});
StatefulFunctionProvider provider =
functionProvider(
entry.getKey(),
specificTypeEndpointSpecs,
perNamespaceEndpointSpecs,
extensionResolver);
specificTypeEndpointSpecs
.keySet()
.forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
perNamespaceEndpointSpecs
.keySet()
.forEach(namespace -> binder.bindFunctionProvider(namespace, provider));
}
}
private static Iterable<? extends JsonNode> functionEndpointSpecNodes(
JsonNode moduleSpecRootNode) {
return Selectors.listAt(moduleSpecRootNode, FUNCTION_ENDPOINTS_POINTER);
}
private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>>
parseFunctionEndpointSpecs(
Iterable<? extends JsonNode> functionEndpointsSpecNodes, FormatVersion version) {
return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), false)
.map(node -> FunctionEndpointJsonEntity.parseFunctionEndpointsSpec(node, version))
.collect(groupingBy(FunctionEndpointSpec::kind, toList()));
}
private static FunctionEndpointSpec parseFunctionEndpointsSpec(
JsonNode functionEndpointSpecNode, FormatVersion version) {
FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode);
switch (kind) {
case HTTP:
final HttpFunctionEndpointSpec.Builder specBuilder =
HttpFunctionEndpointSpec.builder(
target(functionEndpointSpecNode), urlPathTemplate(functionEndpointSpecNode));
optionalMaxNumBatchRequests(functionEndpointSpecNode)
.ifPresent(specBuilder::withMaxNumBatchRequests);
switch (version) {
case v3_1:
final Optional<ObjectNode> transportSpec =
Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TRANSPORT);
transportSpec.ifPresent(spec -> configureHttpTransport(specBuilder, spec));
break;
case v3_0:
final Optional<ObjectNode> deprecatedTimeoutsSpec =
Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TIMEOUTS);
deprecatedTimeoutsSpec.ifPresent(
spec -> configureDeprecatedHttpTimeoutsSpec(specBuilder, spec));
break;
default:
throw new IllegalStateException("Unsupported format version: " + version);
}
return specBuilder.build();
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
throw new IllegalArgumentException("Unrecognized function endpoint kind " + kind);
}
}
private static void configureHttpTransport(
HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode transportSpecNode) {
final Optional<TypeName> transportClientFactoryType =
Selectors.optionalTextAt(transportSpecNode, TransportPointers.CLIENT_FACTORY_TYPE)
.map(TypeName::parseFrom);
transportClientFactoryType.ifPresent(endpointSpecBuilder::withTransportClientFactoryType);
// pass the transport spec node as is as the client properties
endpointSpecBuilder.withTransportClientProperties(transportSpecNode);
}
private static void configureDeprecatedHttpTimeoutsSpec(
HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode deprecatedHttpTimeoutsSpec) {
final ObjectNode reconstructedTimeoutsSpec =
reconstructTimeoutsSpecNode(deprecatedHttpTimeoutsSpec);
endpointSpecBuilder.withTransportClientProperties(reconstructedTimeoutsSpec);
}
private static ObjectNode reconstructTimeoutsSpecNode(ObjectNode deprecatedHttpTimeoutsSpec) {
try {
return (ObjectNode)
new ObjectMapper()
.readTree("{\"timeouts\":" + deprecatedHttpTimeoutsSpec.toString() + "}");
} catch (Exception e) {
throw new RuntimeException("Unable to reconstruct deprecated timeouts spec node.");
}
}
private static FunctionEndpointSpec.Kind endpointKind(JsonNode functionEndpointSpecNode) {
String endpointKind = Selectors.textAt(functionEndpointSpecNode, MetaPointers.KIND);
return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
}
private static FunctionEndpointSpec.Target target(JsonNode functionEndpointSpecNode) {
String targetTypeNameStr =
Selectors.textAt(functionEndpointSpecNode, SpecPointers.TARGET_FUNCTIONS);
TypeName targetTypeName = TypeName.parseFrom(targetTypeNameStr);
if (targetTypeName.namespace().contains("*")) {
throw new IllegalArgumentException(
"Invalid syntax for "
+ SpecPointers.TARGET_FUNCTIONS
+ ". Only <namespace>/<name> or <namespace>/* are supported.");
}
if (targetTypeName.name().equals("*")) {
return FunctionEndpointSpec.Target.namespace(targetTypeName.namespace());
}
if (targetTypeName.name().contains("*")) {
throw new IllegalArgumentException(
"Invalid syntax for "
+ SpecPointers.TARGET_FUNCTIONS
+ ". Only <namespace>/<name> or <namespace>/* are supported.");
}
FunctionType functionType = new FunctionType(targetTypeName.namespace(), targetTypeName.name());
return FunctionEndpointSpec.Target.functionType(functionType);
}
private static FunctionEndpointSpec.UrlPathTemplate urlPathTemplate(
JsonNode functionEndpointSpecNode) {
String template = Selectors.textAt(functionEndpointSpecNode, SpecPointers.URL_PATH_TEMPLATE);
return new FunctionEndpointSpec.UrlPathTemplate(template);
}
private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
}
private static Optional<Duration> optionalTimeoutDuration(
JsonNode node, JsonPointer timeoutPointer) {
return Selectors.optionalTextAt(node, timeoutPointer).map(TimeUtils::parseDuration);
}
private static StatefulFunctionProvider functionProvider(
FunctionEndpointSpec.Kind kind,
Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs,
ExtensionResolver extensionResolver) {
switch (kind) {
case HTTP:
return new HttpFunctionProvider(
castValues(specificTypeEndpointSpecs),
castValues(namespaceAsKey(perNamespaceEndpointSpecs)),
extensionResolver);
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
throw new IllegalStateException("Unexpected kind: " + kind);
}
}
@SuppressWarnings("unchecked")
private static <K, NV extends FunctionEndpointSpec> Map<K, NV> castValues(
Map<K, FunctionEndpointSpec> toCast) {
return (Map<K, NV>) new HashMap<>(toCast);
}
private static Map<String, FunctionEndpointSpec> namespaceAsKey(
Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
final Map<String, FunctionEndpointSpec> converted =
new HashMap<>(perNamespaceEndpointSpecs.size());
perNamespaceEndpointSpecs.forEach(
(namespaceMatcher, spec) -> converted.put(namespaceMatcher.targetNamespace(), spec));
return converted;
}
}