[FLINK-23296] [core] Introduce RequestReplyClientFactory extension
* Rename default HTTP request-reply client
* Implement default HTTP client factory
* Wire in RequestReplyClientFactory into HttpFunctionEndpointProvider
* Pass in ExtensionResolver when binding JsonEntities
* Bump YAML format version
* Implement v3.1 YAML format with transport spec parsing
* Implement context-safe decorator for RequestReplyClient
* Ensure correct classloader when using tranport clients and factories
* Move Classloader handling responsibility to factory implementations
* Add TransportClientsModule
This closes #243.
diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index a5b9d2c..f746a7f 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -23,10 +23,22 @@
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.util.TimeUtils;
public final class Selectors {
+ public static Optional<ObjectNode> optionalObjectAt(JsonNode node, JsonPointer pointer) {
+ node = node.at(pointer);
+ if (node.isMissingNode()) {
+ return Optional.empty();
+ }
+ if (!node.isObject()) {
+ throw new WrongTypeException(pointer, "not an object");
+ }
+ return Optional.of((ObjectNode) node);
+ }
+
public static String textAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isTextual()) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
index 9c39977..4fffffc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverse.java
@@ -24,6 +24,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
@@ -33,15 +34,20 @@
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.io.Router;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
public final class StatefulFunctionsUniverse
- implements StatefulFunctionModule.Binder, FlinkIoModule.Binder {
+ implements StatefulFunctionModule.Binder,
+ FlinkIoModule.Binder,
+ ExtensionModule.Binder,
+ ExtensionResolver {
private final Map<IngressIdentifier<?>, IngressSpec<?>> ingress = new HashMap<>();
private final Map<EgressIdentifier<?>, EgressSpec<?>> egress = new HashMap<>();
@@ -51,6 +57,7 @@
private final Map<String, StatefulFunctionProvider> namespaceFunctionProviders = new HashMap<>();
private final Map<IngressType, SourceProvider> sources = new HashMap<>();
private final Map<EgressType, SinkProvider> sinks = new HashMap<>();
+ private final Map<TypeName, Object> extensions = new HashMap<>();
private final StaticallyRegisteredTypes types;
private final MessageFactoryKey messageFactoryKey;
@@ -112,6 +119,30 @@
putAndThrowIfPresent(sinks, type, provider);
}
+ @Override
+ public <T> void bindExtension(TypeName typeName, T extension) {
+ putAndThrowIfPresent(extensions, typeName, extension);
+ }
+
+ @Override
+ public <T> T resolveExtension(TypeName typeName, Class<T> extensionClass) {
+ final Object rawTypedExtension = extensions.get(typeName);
+ if (rawTypedExtension == null) {
+ throw new IllegalStateException("An extension with type " + typeName + " does not exist.");
+ }
+
+ if (rawTypedExtension.getClass().isAssignableFrom(extensionClass)) {
+ throw new IllegalStateException(
+ "Unexpected class for extension "
+ + typeName
+ + "; expected "
+ + extensionClass
+ + ", but was "
+ + rawTypedExtension.getClass());
+ }
+ return extensionClass.cast(rawTypedExtension);
+ }
+
public Map<IngressIdentifier<?>, IngressSpec<?>> ingress() {
return ingress;
}
@@ -140,6 +171,10 @@
return sinks;
}
+ public Map<TypeName, Object> getExtensions() {
+ return extensions;
+ }
+
public StaticallyRegisteredTypes types() {
return types;
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
similarity index 93%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
index fd7e5fb..f2cf795 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClient.java
@@ -39,14 +39,14 @@
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.util.IOUtils;
-final class HttpRequestReplyClient implements RequestReplyClient {
+final class DefaultHttpRequestReplyClient implements RequestReplyClient {
private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream");
private final HttpUrl url;
private final OkHttpClient client;
private final BooleanSupplier isShutdown;
- HttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
+ DefaultHttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
this.url = Objects.requireNonNull(url);
this.client = Objects.requireNonNull(client);
this.isShutdown = Objects.requireNonNull(isShutdown);
@@ -67,7 +67,7 @@
RetryingCallback callback =
new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown);
callback.attachToCall(newCall);
- return callback.future().thenApply(HttpRequestReplyClient::parseResponse);
+ return callback.future().thenApply(DefaultHttpRequestReplyClient::parseResponse);
}
private static FromFunction parseResponse(Response response) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
new file mode 100644
index 0000000..fd7fcc2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.SetContextClassLoader;
+import org.apache.flink.statefun.flink.core.reqreply.ClassLoaderSafeRequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+
+public final class DefaultHttpRequestReplyClientFactory implements RequestReplyClientFactory {
+
+ /** Unknown fields in client properties are silently ignored. */
+ private static final ObjectMapper OBJ_MAPPER =
+ new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ /** lazily initialized by {@link #createTransportClient} */
+ @Nullable private OkHttpClient sharedClient;
+
+ private volatile boolean shutdown;
+
+ @Override
+ public RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl) {
+ final DefaultHttpRequestReplyClient client = createClient(transportProperties, endpointUrl);
+
+ if (Thread.currentThread().getContextClassLoader() == getClass().getClassLoader()) {
+ return client;
+ } else {
+ return new ClassLoaderSafeRequestReplyClient(client);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if (!shutdown) {
+ shutdown = true;
+ OkHttpUtils.closeSilently(sharedClient);
+ }
+ }
+
+ private DefaultHttpRequestReplyClient createClient(
+ ObjectNode transportProperties, URI endpointUrl) {
+ try (SetContextClassLoader ignored = new SetContextClassLoader(this)) {
+ if (sharedClient == null) {
+ sharedClient = OkHttpUtils.newClient();
+ }
+ final OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
+
+ final DefaultHttpRequestReplyClientSpec transportClientSpec =
+ parseTransportProperties(transportProperties);
+
+ clientBuilder.callTimeout(transportClientSpec.getTimeouts().getCallTimeout());
+ clientBuilder.connectTimeout(transportClientSpec.getTimeouts().getConnectTimeout());
+ clientBuilder.readTimeout(transportClientSpec.getTimeouts().getReadTimeout());
+ clientBuilder.writeTimeout(transportClientSpec.getTimeouts().getWriteTimeout());
+
+ HttpUrl url;
+ if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
+ UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
+
+ url =
+ new HttpUrl.Builder()
+ .scheme("http")
+ .host("unused")
+ .addPathSegment(endpoint.pathSegment)
+ .build();
+
+ configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
+ } else {
+ url = HttpUrl.get(endpointUrl);
+ }
+
+ return new DefaultHttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
+ }
+ }
+
+ private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
+ ObjectNode transportClientProperties) {
+ try {
+ return OBJ_MAPPER.treeToValue(
+ transportClientProperties, DefaultHttpRequestReplyClientSpec.class);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to parse transport client properties when creating client: ", e);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
new file mode 100644
index 0000000..4639fc2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.util.TimeUtils;
+
+public final class DefaultHttpRequestReplyClientSpec {
+
+ @JsonProperty("timeouts")
+ private Timeouts timeouts = new Timeouts();
+
+ @JsonSetter("timeouts")
+ public void setTimeouts(Timeouts timeouts) {
+ validateTimeouts(
+ timeouts.callTimeout, timeouts.connectTimeout, timeouts.readTimeout, timeouts.writeTimeout);
+ this.timeouts = timeouts;
+ }
+
+ public Timeouts getTimeouts() {
+ return timeouts;
+ }
+
+ private static void validateTimeouts(
+ Duration callTimeout, Duration connectTimeout, Duration readTimeout, Duration writeTimeout) {
+
+ if (connectTimeout.compareTo(callTimeout) > 0) {
+ throw new IllegalArgumentException("Connect timeout cannot be larger than request timeout.");
+ }
+
+ if (readTimeout.compareTo(callTimeout) > 0) {
+ throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
+ }
+
+ if (writeTimeout.compareTo(callTimeout) > 0) {
+ throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
+ }
+ }
+
+ public static final class Timeouts {
+
+ private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+ private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
+ private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
+ private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
+
+ private Duration callTimeout = DEFAULT_HTTP_TIMEOUT;
+ private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
+ private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
+ private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
+
+ @JsonSetter("call")
+ @JsonDeserialize(using = DurationJsonDeserialize.class)
+ public void setCallTimeout(Duration callTimeout) {
+ this.callTimeout = requireNonZeroDuration(callTimeout);
+ }
+
+ @JsonSetter("connect")
+ @JsonDeserialize(using = DurationJsonDeserialize.class)
+ public void setConnectTimeout(Duration connectTimeout) {
+ this.connectTimeout = requireNonZeroDuration(connectTimeout);
+ }
+
+ @JsonSetter("read")
+ @JsonDeserialize(using = DurationJsonDeserialize.class)
+ public void setReadTimeout(Duration readTimeout) {
+ this.readTimeout = requireNonZeroDuration(readTimeout);
+ }
+
+ @JsonSetter("write")
+ @JsonDeserialize(using = DurationJsonDeserialize.class)
+ public void setWriteTimeout(Duration writeTimeout) {
+ this.writeTimeout = requireNonZeroDuration(writeTimeout);
+ }
+
+ public Duration getCallTimeout() {
+ return callTimeout;
+ }
+
+ public Duration getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public Duration getReadTimeout() {
+ return readTimeout;
+ }
+
+ public Duration getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ private static Duration requireNonZeroDuration(Duration duration) {
+ Objects.requireNonNull(duration);
+ if (duration.equals(Duration.ZERO)) {
+ throw new IllegalArgumentException("Timeout durations must be larger than 0.");
+ }
+
+ return duration;
+ }
+ }
+
+ private static final class DurationJsonDeserialize extends JsonDeserializer<Duration> {
+ @Override
+ public Duration deserialize(
+ JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+ return TimeUtils.parseDuration(jsonParser.getText());
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
index 85df004..428e474 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionEndpointSpec.java
@@ -17,30 +17,36 @@
*/
package org.apache.flink.statefun.flink.core.httpfn;
+import static org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE;
+
import java.io.Serializable;
-import java.time.Duration;
import java.util.Objects;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec;
+import org.apache.flink.statefun.sdk.TypeName;
public final class HttpFunctionEndpointSpec implements FunctionEndpointSpec, Serializable {
private static final long serialVersionUID = 1;
- private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
- private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
- private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10);
- private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10);
private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+ // ============================================================
+ // Request-Reply invocation protocol configurations
+ // ============================================================
+
private final Target target;
private final UrlPathTemplate urlPathTemplate;
-
- private final Duration maxRequestDuration;
- private final Duration connectTimeout;
- private final Duration readTimeout;
- private final Duration writeTimeout;
private final int maxNumBatchRequests;
+ // ============================================================
+ // HTTP transport related properties
+ // ============================================================
+
+ private final TypeName transportClientFactoryType;
+ private final ObjectNode transportClientProps;
+
public static Builder builder(Target target, UrlPathTemplate urlPathTemplate) {
return new Builder(target, urlPathTemplate);
}
@@ -48,18 +54,14 @@
private HttpFunctionEndpointSpec(
Target target,
UrlPathTemplate urlPathTemplate,
- Duration maxRequestDuration,
- Duration connectTimeout,
- Duration readTimeout,
- Duration writeTimeout,
- int maxNumBatchRequests) {
+ int maxNumBatchRequests,
+ TypeName transportClientFactoryType,
+ ObjectNode transportClientProps) {
this.target = target;
this.urlPathTemplate = urlPathTemplate;
- this.maxRequestDuration = maxRequestDuration;
- this.connectTimeout = connectTimeout;
- this.readTimeout = readTimeout;
- this.writeTimeout = writeTimeout;
this.maxNumBatchRequests = maxNumBatchRequests;
+ this.transportClientFactoryType = transportClientFactoryType;
+ this.transportClientProps = transportClientProps;
}
@Override
@@ -77,102 +79,55 @@
return urlPathTemplate;
}
- public Duration maxRequestDuration() {
- return maxRequestDuration;
- }
-
- public Duration connectTimeout() {
- return connectTimeout;
- }
-
- public Duration readTimeout() {
- return readTimeout;
- }
-
- public Duration writeTimeout() {
- return writeTimeout;
- }
-
public int maxNumBatchRequests() {
return maxNumBatchRequests;
}
+ public TypeName transportClientFactoryType() {
+ return transportClientFactoryType;
+ }
+
+ public ObjectNode transportClientProperties() {
+ return transportClientProps;
+ }
+
public static final class Builder {
private final Target target;
private final UrlPathTemplate urlPathTemplate;
-
- private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
- private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
- private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
- private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
+ private TypeName transportClientFactoryType = OKHTTP_CLIENT_FACTORY_TYPE;
+ private ObjectNode transportClientProperties = new ObjectMapper().createObjectNode();
+
private Builder(Target target, UrlPathTemplate urlPathTemplate) {
this.target = Objects.requireNonNull(target);
this.urlPathTemplate = Objects.requireNonNull(urlPathTemplate);
}
- public Builder withMaxRequestDuration(Duration duration) {
- this.maxRequestDuration = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withConnectTimeoutDuration(Duration duration) {
- this.connectTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withReadTimeoutDuration(Duration duration) {
- this.readTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
- public Builder withWriteTimeoutDuration(Duration duration) {
- this.writeTimeout = requireNonZeroDuration(duration);
- return this;
- }
-
public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
this.maxNumBatchRequests = maxNumBatchRequests;
return this;
}
+ public Builder withTransportClientFactoryType(TypeName transportClientFactoryType) {
+ this.transportClientFactoryType = Objects.requireNonNull(transportClientFactoryType);
+ return this;
+ }
+
+ public Builder withTransportClientProperties(ObjectNode transportClientProperties) {
+ this.transportClientProperties = Objects.requireNonNull(transportClientProperties);
+ return this;
+ }
+
public HttpFunctionEndpointSpec build() {
- validateTimeouts();
return new HttpFunctionEndpointSpec(
target,
urlPathTemplate,
- maxRequestDuration,
- connectTimeout,
- readTimeout,
- writeTimeout,
- maxNumBatchRequests);
- }
-
- private Duration requireNonZeroDuration(Duration duration) {
- Objects.requireNonNull(duration);
- if (duration.equals(Duration.ZERO)) {
- throw new IllegalArgumentException("Timeout durations must be larger than 0.");
- }
-
- return duration;
- }
-
- private void validateTimeouts() {
- if (connectTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException(
- "Connect timeout cannot be larger than request timeout.");
- }
-
- if (readTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException("Read timeout cannot be larger than request timeout.");
- }
-
- if (writeTimeout.compareTo(maxRequestDuration) > 0) {
- throw new IllegalArgumentException("Write timeout cannot be larger than request timeout.");
- }
+ maxNumBatchRequests,
+ transportClientFactoryType,
+ transportClientProperties);
}
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 5445500..2bcb5dd 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -17,21 +17,20 @@
*/
package org.apache.flink.statefun.flink.core.httpfn;
-import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUnixSocketBridge.configureUnixDomainSocket;
-
import java.net.URI;
import java.util.Map;
import java.util.Objects;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.core.common.ManagingResources;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.TypeName;
@NotThreadSafe
public final class HttpFunctionProvider implements StatefulFunctionProvider, ManagingResources {
@@ -39,23 +38,25 @@
private final Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs;
private final Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs;
- /** lazily initialized by {code buildHttpClient} */
- @Nullable private OkHttpClient sharedClient;
-
- private volatile boolean shutdown;
+ private final ExtensionResolver extensionResolver;
public HttpFunctionProvider(
Map<FunctionType, HttpFunctionEndpointSpec> specificTypeEndpointSpecs,
- Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs) {
+ Map<String, HttpFunctionEndpointSpec> perNamespaceEndpointSpecs,
+ ExtensionResolver extensionResolver) {
this.specificTypeEndpointSpecs = Objects.requireNonNull(specificTypeEndpointSpecs);
this.perNamespaceEndpointSpecs = Objects.requireNonNull(perNamespaceEndpointSpecs);
+ this.extensionResolver = Objects.requireNonNull(extensionResolver);
}
@Override
public StatefulFunction functionOfType(FunctionType functionType) {
final HttpFunctionEndpointSpec endpointsSpec = getEndpointsSpecOrThrow(functionType);
+ final URI endpointUrl = endpointsSpec.urlPathTemplate().apply(functionType);
+
return new RequestReplyFunction(
- endpointsSpec.maxNumBatchRequests(), buildHttpClient(endpointsSpec, functionType));
+ endpointsSpec.maxNumBatchRequests(),
+ buildTransportClientFromSpec(endpointUrl, endpointsSpec));
}
private HttpFunctionEndpointSpec getEndpointsSpecOrThrow(FunctionType functionType) {
@@ -71,40 +72,19 @@
throw new IllegalStateException("Unknown type: " + functionType);
}
- private RequestReplyClient buildHttpClient(
- HttpFunctionEndpointSpec spec, FunctionType functionType) {
- if (sharedClient == null) {
- sharedClient = OkHttpUtils.newClient();
- }
- OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
- clientBuilder.callTimeout(spec.maxRequestDuration());
- clientBuilder.connectTimeout(spec.connectTimeout());
- clientBuilder.readTimeout(spec.readTimeout());
- clientBuilder.writeTimeout(spec.writeTimeout());
+ private RequestReplyClient buildTransportClientFromSpec(
+ URI endpointUrl, HttpFunctionEndpointSpec endpointsSpec) {
+ final TypeName factoryType = endpointsSpec.transportClientFactoryType();
+ final ObjectNode properties = endpointsSpec.transportClientProperties();
- URI endpointUrl = spec.urlPathTemplate().apply(functionType);
-
- final HttpUrl url;
- if (UnixDomainHttpEndpoint.validate(endpointUrl)) {
- UnixDomainHttpEndpoint endpoint = UnixDomainHttpEndpoint.parseFrom(endpointUrl);
-
- url =
- new HttpUrl.Builder()
- .scheme("http")
- .host("unused")
- .addPathSegment(endpoint.pathSegment)
- .build();
-
- configureUnixDomainSocket(clientBuilder, endpoint.unixDomainFile);
- } else {
- url = HttpUrl.get(endpointUrl);
- }
- return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
+ final RequestReplyClientFactory factory =
+ extensionResolver.resolveExtension(factoryType, RequestReplyClientFactory.class);
+ return factory.createTransportClient(properties, endpointUrl);
}
@Override
public void shutdown() {
- shutdown = true;
- OkHttpUtils.closeSilently(sharedClient);
+ // TODO all RequestReplyClientFactory's need to be shutdown.
+ // TODO This should probably happen in StatefulFunctionsUniverse.
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java
new file mode 100644
index 0000000..d5f8a35
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import org.apache.flink.statefun.sdk.TypeName;
+
+public final class TransportClientConstants {
+
+ public static final TypeName OKHTTP_CLIENT_FACTORY_TYPE =
+ TypeName.parseFrom("io.statefun.transports/okhttp");
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java
new file mode 100644
index 0000000..1bad8db
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/TransportClientsModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
+
+@AutoService(ExtensionModule.class)
+public class TransportClientsModule implements ExtensionModule {
+ @Override
+ public void configure(Map<String, String> globalConfigurations, Binder binder) {
+ binder.bindExtension(
+ TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE,
+ new DefaultHttpRequestReplyClientFactory());
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
index d3040b7..e9b8ef7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/EgressJsonEntity.java
@@ -22,6 +22,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.flink.io.spi.JsonEgressSpec;
import org.apache.flink.statefun.sdk.EgressType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -38,7 +39,11 @@
}
@Override
- public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ public void bind(
+ Binder binder,
+ ExtensionResolver extensionResolver,
+ JsonNode moduleSpecRootNode,
+ FormatVersion formatVersion) {
final Iterable<? extends JsonNode> egressNodes =
Selectors.listAt(moduleSpecRootNode, EGRESS_SPECS_POINTER);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
index 580e6c7..0165560 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FormatVersion.java
@@ -30,7 +30,8 @@
// Supported versions
// ============================================================
- v3_0("3.0");
+ v3_0("3.0"),
+ v3_1("3.1");
private String versionStr;
@@ -51,6 +52,8 @@
return v2_0;
case "3.0":
return v3_0;
+ case "3.1":
+ return v3_1;
default:
throw new IllegalArgumentException("Unrecognized format version: " + versionStr);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
index 74500cd..337aa4e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionEndpointJsonEntity.java
@@ -30,9 +30,12 @@
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.FunctionTypeNamespaceMatcher;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
@@ -53,22 +56,25 @@
JsonPointer.compile("/endpoint/spec/functions");
private static final JsonPointer URL_PATH_TEMPLATE =
JsonPointer.compile("/endpoint/spec/urlPathTemplate");
- private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
JsonPointer.compile("/endpoint/spec/maxNumBatchRequests");
+ private static final JsonPointer TRANSPORT = JsonPointer.compile("/endpoint/spec/transport");
+
+ @Deprecated
+ private static final JsonPointer TIMEOUTS = JsonPointer.compile("/endpoint/spec/timeouts");
}
- private static final class TimeoutPointers {
- private static final JsonPointer CALL = JsonPointer.compile("/call");
- private static final JsonPointer CONNECT = JsonPointer.compile("/connect");
- private static final JsonPointer READ = JsonPointer.compile("/read");
- private static final JsonPointer WRITE = JsonPointer.compile("/write");
+ private static final class TransportPointers {
+ private static final JsonPointer CLIENT_FACTORY_TYPE = JsonPointer.compile("/type");
}
@Override
public void bind(
- StatefulFunctionModule.Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion) {
- if (formatVersion != FormatVersion.v3_0) {
+ StatefulFunctionModule.Binder binder,
+ ExtensionResolver extensionResolver,
+ JsonNode moduleSpecNode,
+ FormatVersion formatVersion) {
+ if (formatVersion.compareTo(FormatVersion.v3_0) < 0) {
throw new IllegalArgumentException("endpoints is only supported with format version 3.0.");
}
@@ -76,7 +82,7 @@
functionEndpointSpecNodes(moduleSpecNode);
for (Map.Entry<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>> entry :
- parseFunctionEndpointSpecs(functionEndpointsSpecNodes).entrySet()) {
+ parseFunctionEndpointSpecs(functionEndpointsSpecNodes, formatVersion).entrySet()) {
final Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs = new HashMap<>();
final Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs =
new HashMap<>();
@@ -94,7 +100,11 @@
});
StatefulFunctionProvider provider =
- functionProvider(entry.getKey(), specificTypeEndpointSpecs, perNamespaceEndpointSpecs);
+ functionProvider(
+ entry.getKey(),
+ specificTypeEndpointSpecs,
+ perNamespaceEndpointSpecs,
+ extensionResolver);
specificTypeEndpointSpecs
.keySet()
.forEach(specificType -> binder.bindFunctionProvider(specificType, provider));
@@ -110,14 +120,15 @@
}
private static Map<FunctionEndpointSpec.Kind, List<FunctionEndpointSpec>>
- parseFunctionEndpointSpecs(Iterable<? extends JsonNode> functionEndpointsSpecNodes) {
+ parseFunctionEndpointSpecs(
+ Iterable<? extends JsonNode> functionEndpointsSpecNodes, FormatVersion version) {
return StreamSupport.stream(functionEndpointsSpecNodes.spliterator(), false)
- .map(FunctionEndpointJsonEntity::parseFunctionEndpointsSpec)
+ .map(node -> FunctionEndpointJsonEntity.parseFunctionEndpointsSpec(node, version))
.collect(groupingBy(FunctionEndpointSpec::kind, toList()));
}
private static FunctionEndpointSpec parseFunctionEndpointsSpec(
- JsonNode functionEndpointSpecNode) {
+ JsonNode functionEndpointSpecNode, FormatVersion version) {
FunctionEndpointSpec.Kind kind = endpointKind(functionEndpointSpecNode);
switch (kind) {
@@ -126,17 +137,24 @@
HttpFunctionEndpointSpec.builder(
target(functionEndpointSpecNode), urlPathTemplate(functionEndpointSpecNode));
- JsonNode timeoutsNode = functionEndpointSpecNode.at(SpecPointers.TIMEOUTS);
optionalMaxNumBatchRequests(functionEndpointSpecNode)
.ifPresent(specBuilder::withMaxNumBatchRequests);
- optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CALL)
- .ifPresent(specBuilder::withMaxRequestDuration);
- optionalTimeoutDuration(timeoutsNode, TimeoutPointers.CONNECT)
- .ifPresent(specBuilder::withConnectTimeoutDuration);
- optionalTimeoutDuration(timeoutsNode, TimeoutPointers.READ)
- .ifPresent(specBuilder::withReadTimeoutDuration);
- optionalTimeoutDuration(timeoutsNode, TimeoutPointers.WRITE)
- .ifPresent(specBuilder::withWriteTimeoutDuration);
+
+ switch (version) {
+ case v3_1:
+ final Optional<ObjectNode> transportSpec =
+ Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TRANSPORT);
+ transportSpec.ifPresent(spec -> configureHttpTransport(specBuilder, spec));
+ break;
+ case v3_0:
+ final Optional<ObjectNode> deprecatedTimeoutsSpec =
+ Selectors.optionalObjectAt(functionEndpointSpecNode, SpecPointers.TIMEOUTS);
+ deprecatedTimeoutsSpec.ifPresent(
+ spec -> configureDeprecatedHttpTimeoutsSpec(specBuilder, spec));
+ break;
+ default:
+ throw new IllegalStateException("Unsupported format version: " + version);
+ }
return specBuilder.build();
case GRPC:
@@ -146,6 +164,34 @@
}
}
+ private static void configureHttpTransport(
+ HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode transportSpecNode) {
+ final Optional<TypeName> transportClientFactoryType =
+ Selectors.optionalTextAt(transportSpecNode, TransportPointers.CLIENT_FACTORY_TYPE)
+ .map(TypeName::parseFrom);
+ transportClientFactoryType.ifPresent(endpointSpecBuilder::withTransportClientFactoryType);
+
+ // pass the transport spec node as is as the client properties
+ endpointSpecBuilder.withTransportClientProperties(transportSpecNode);
+ }
+
+ private static void configureDeprecatedHttpTimeoutsSpec(
+ HttpFunctionEndpointSpec.Builder endpointSpecBuilder, ObjectNode deprecatedHttpTimeoutsSpec) {
+ final ObjectNode reconstructedTimeoutsSpec =
+ reconstructTimeoutsSpecNode(deprecatedHttpTimeoutsSpec);
+ endpointSpecBuilder.withTransportClientProperties(reconstructedTimeoutsSpec);
+ }
+
+ private static ObjectNode reconstructTimeoutsSpecNode(ObjectNode deprecatedHttpTimeoutsSpec) {
+ try {
+ return (ObjectNode)
+ new ObjectMapper()
+ .readTree("{\"timeouts\":" + deprecatedHttpTimeoutsSpec.toString() + "}");
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to reconstruct deprecated timeouts spec node.");
+ }
+ }
+
private static FunctionEndpointSpec.Kind endpointKind(JsonNode functionEndpointSpecNode) {
String endpointKind = Selectors.textAt(functionEndpointSpecNode, MetaPointers.KIND);
return FunctionEndpointSpec.Kind.valueOf(endpointKind.toUpperCase(Locale.getDefault()));
@@ -192,12 +238,14 @@
private static StatefulFunctionProvider functionProvider(
FunctionEndpointSpec.Kind kind,
Map<FunctionType, FunctionEndpointSpec> specificTypeEndpointSpecs,
- Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs) {
+ Map<FunctionTypeNamespaceMatcher, FunctionEndpointSpec> perNamespaceEndpointSpecs,
+ ExtensionResolver extensionResolver) {
switch (kind) {
case HTTP:
return new HttpFunctionProvider(
castValues(specificTypeEndpointSpecs),
- castValues(namespaceAsKey(perNamespaceEndpointSpecs)));
+ castValues(namespaceAsKey(perNamespaceEndpointSpecs)),
+ extensionResolver);
case GRPC:
throw new UnsupportedOperationException("GRPC endpoints are not supported yet.");
default:
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
index 83c5d00..ad1fb35 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/IngressJsonEntity.java
@@ -24,6 +24,7 @@
import org.apache.flink.statefun.flink.common.json.NamespaceNamePair;
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.core.protorouter.AutoRoutableProtobufRouter;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.flink.io.kafka.ProtobufKafkaIngressTypes;
import org.apache.flink.statefun.flink.io.kinesis.PolyglotKinesisIOTypes;
import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
@@ -41,7 +42,11 @@
}
@Override
- public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ public void bind(
+ Binder binder,
+ ExtensionResolver extensionResolver,
+ JsonNode moduleSpecRootNode,
+ FormatVersion formatVersion) {
final Iterable<? extends JsonNode> ingressNodes =
Selectors.listAt(moduleSpecRootNode, INGRESS_SPECS_POINTER);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
index 9d40307..f1afd79 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonEntity.java
@@ -19,6 +19,7 @@
package org.apache.flink.statefun.flink.core.jsonmodule;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
/**
@@ -35,5 +36,9 @@
* @param moduleSpecNode the root module spec node.
* @param formatVersion the format version of the module spec.
*/
- void bind(Binder binder, JsonNode moduleSpecNode, FormatVersion formatVersion);
+ void bind(
+ Binder binder,
+ ExtensionResolver extensionResolver,
+ JsonNode moduleSpecNode,
+ FormatVersion formatVersion);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index eb26c44..d38ab6d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
final class JsonModule implements StatefulFunctionModule {
@@ -49,10 +50,17 @@
public void configure(Map<String, String> conf, Binder binder) {
try {
- ENTITIES.forEach(jsonEntity -> jsonEntity.bind(binder, moduleSpecNode, formatVersion));
+ ENTITIES.forEach(
+ jsonEntity ->
+ jsonEntity.bind(binder, getExtensionResolver(binder), moduleSpecNode, formatVersion));
} catch (Throwable t) {
throw new ModuleConfigurationException(
format("Error while parsing module at %s", moduleUrl), t);
}
}
+
+ // TODO expose ExtensionResolver properly once we have more usages
+ private static ExtensionResolver getExtensionResolver(Binder binder) {
+ return (ExtensionResolver) binder;
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
index 30c3ff2..66d99a2 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RouterJsonEntity.java
@@ -30,6 +30,7 @@
import org.apache.flink.statefun.flink.common.json.Selectors;
import org.apache.flink.statefun.flink.common.protobuf.ProtobufDescriptorMap;
import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
@@ -50,7 +51,11 @@
}
@Override
- public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) {
+ public void bind(
+ Binder binder,
+ ExtensionResolver extensionResolver,
+ JsonNode moduleSpecRootNode,
+ FormatVersion formatVersion) {
final Iterable<? extends JsonNode> routerNodes =
Selectors.listAt(moduleSpecRootNode, ROUTER_SPECS_POINTER);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java
new file mode 100644
index 0000000..cee0338
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/ClassLoaderSafeRequestReplyClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+/**
+ * Decorator for a {@link RequestReplyClient} that makes sure we always use the correct classloader.
+ * This is required since client implementation may be user provided.
+ */
+public final class ClassLoaderSafeRequestReplyClient implements RequestReplyClient {
+
+ private final ClassLoader delegateClassLoader;
+ private final RequestReplyClient delegate;
+
+ public ClassLoaderSafeRequestReplyClient(RequestReplyClient delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ this.delegateClassLoader = delegate.getClass().getClassLoader();
+ }
+
+ @Override
+ public CompletableFuture<FromFunction> call(
+ ToFunctionRequestSummary requestSummary,
+ RemoteInvocationMetrics metrics,
+ ToFunction toFunction) {
+ final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+
+ try {
+ Thread.currentThread().setContextClassLoader(delegateClassLoader);
+ return delegate.call(requestSummary, metrics, toFunction);
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
index 19fddee..7a423ff 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
@@ -19,10 +19,12 @@
package org.apache.flink.statefun.flink.core.reqreply;
import java.util.concurrent.CompletableFuture;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+@PublicEvolving
public interface RequestReplyClient {
CompletableFuture<FromFunction> call(
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java
new file mode 100644
index 0000000..865e246
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClientFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import java.net.URI;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+@PublicEvolving
+public interface RequestReplyClientFactory {
+ RequestReplyClient createTransportClient(ObjectNode transportProperties, URI endpointUrl);
+
+ void cleanup();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java
new file mode 100644
index 0000000..c7a44f9
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/ExtensionResolver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.spi;
+
+import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
+
+/**
+ * Resolves a bound extension (bound by {@link ExtensionModule}s) given specified {@link TypeName}s.
+ */
+public interface ExtensionResolver {
+ <T> T resolveExtension(TypeName typeName, Class<T> extensionClass);
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
index 6ca5d24..26bd524 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/spi/Modules.java
@@ -24,14 +24,19 @@
import org.apache.flink.statefun.flink.core.jsonmodule.JsonServiceLoader;
import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
public final class Modules {
+ private final List<ExtensionModule> extensionModules;
private final List<FlinkIoModule> ioModules;
private final List<StatefulFunctionModule> statefulFunctionModules;
private Modules(
- List<FlinkIoModule> ioModules, List<StatefulFunctionModule> statefulFunctionModules) {
+ List<ExtensionModule> extensionModules,
+ List<FlinkIoModule> ioModules,
+ List<StatefulFunctionModule> statefulFunctionModules) {
+ this.extensionModules = extensionModules;
this.ioModules = ioModules;
this.statefulFunctionModules = statefulFunctionModules;
}
@@ -39,7 +44,11 @@
public static Modules loadFromClassPath() {
List<StatefulFunctionModule> statefulFunctionModules = new ArrayList<>();
List<FlinkIoModule> ioModules = new ArrayList<>();
+ List<ExtensionModule> extensionModules = new ArrayList<>();
+ for (ExtensionModule extensionModule : ServiceLoader.load(ExtensionModule.class)) {
+ extensionModules.add(extensionModule);
+ }
for (StatefulFunctionModule provider : ServiceLoader.load(StatefulFunctionModule.class)) {
statefulFunctionModules.add(provider);
}
@@ -49,7 +58,7 @@
for (FlinkIoModule provider : ServiceLoader.load(FlinkIoModule.class)) {
ioModules.add(provider);
}
- return new Modules(ioModules, statefulFunctionModules);
+ return new Modules(extensionModules, ioModules, statefulFunctionModules);
}
public StatefulFunctionsUniverse createStatefulFunctionsUniverse(
@@ -60,6 +69,13 @@
final Map<String, String> globalConfiguration = configuration.getGlobalConfigurations();
+ // it is important to bind and configure the extension modules first, since
+ // other modules (IO and functions) may use extensions already.
+ for (ExtensionModule module : extensionModules) {
+ try (SetContextClassLoader ignored = new SetContextClassLoader(module)) {
+ module.configure(globalConfiguration, universe);
+ }
+ }
for (FlinkIoModule module : ioModules) {
try (SetContextClassLoader ignored = new SetContextClassLoader(module)) {
module.configure(globalConfiguration, universe);
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java
new file mode 100644
index 0000000..859e1b5
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsUniverseTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import static org.hamcrest.core.IsSame.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.sdk.TypeName;
+import org.junit.Test;
+
+public class StatefulFunctionsUniverseTest {
+
+ @Test
+ public void testExtensions() {
+ final StatefulFunctionsUniverse universe = emptyUniverse();
+
+ final ExtensionImpl extension = new ExtensionImpl();
+ universe.bindExtension(TypeName.parseFrom("test.namespace/test.name"), extension);
+
+ assertThat(
+ extension,
+ sameInstance(
+ universe.resolveExtension(
+ TypeName.parseFrom("test.namespace/test.name"), BaseExtension.class)));
+ }
+
+ private static StatefulFunctionsUniverse emptyUniverse() {
+ return new StatefulFunctionsUniverse(
+ MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
+ }
+
+ private interface BaseExtension {}
+
+ private static class ExtensionImpl implements BaseExtension {}
+}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
index cc928b0..cba075f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModuleTest.java
@@ -25,22 +25,43 @@
import static org.junit.Assert.assertThat;
import com.google.protobuf.Message;
+import java.net.URI;
import java.net.URL;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.apache.flink.statefun.sdk.spi.ExtensionModule;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class JsonModuleTest {
- private static final String modulePath = "module-v3_0/module.yaml";
+ private final String modulePath;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<String> modulePaths() {
+ return Arrays.asList("module-v3_0/module.yaml", "module-v3_1/module.yaml");
+ }
+
+ public JsonModuleTest(String modulePath) {
+ this.modulePath = modulePath;
+ }
@Test
public void exampleUsage() {
@@ -52,9 +73,11 @@
@Test
public void testFunctions() {
StatefulFunctionModule module = fromPath(modulePath);
+ ExtensionModule extensionModule =
+ transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
+ setupUniverse(universe, module, extensionModule);
assertThat(
universe.functions(),
@@ -68,9 +91,11 @@
@Test
public void testRouters() {
StatefulFunctionModule module = fromPath(modulePath);
+ ExtensionModule extensionModule =
+ transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
+ setupUniverse(universe, module, extensionModule);
assertThat(
universe.routers(),
@@ -80,9 +105,11 @@
@Test
public void testIngresses() {
StatefulFunctionModule module = fromPath(modulePath);
+ ExtensionModule extensionModule =
+ transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
+ setupUniverse(universe, module, extensionModule);
assertThat(
universe.ingress(),
@@ -92,9 +119,11 @@
@Test
public void testEgresses() {
StatefulFunctionModule module = fromPath(modulePath);
+ ExtensionModule extensionModule =
+ transportClientExtensions(TypeName.parseFrom("my.custom/http.transport.type"));
StatefulFunctionsUniverse universe = emptyUniverse();
- module.configure(Collections.emptyMap(), universe);
+ setupUniverse(universe, module, extensionModule);
assertThat(
universe.egress(),
@@ -108,8 +137,48 @@
return JsonServiceLoader.fromUrl(mapper, moduleUrl);
}
+ private static ExtensionModule transportClientExtensions(TypeName type) {
+ return new TransportClientBindingModule(type);
+ }
+
private static StatefulFunctionsUniverse emptyUniverse() {
return new StatefulFunctionsUniverse(
MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null));
}
+
+ private static void setupUniverse(
+ StatefulFunctionsUniverse universe,
+ StatefulFunctionModule functionModule,
+ ExtensionModule extensionModule) {
+ final Map<String, String> globalConfig = new HashMap<>();
+ extensionModule.configure(globalConfig, universe);
+ functionModule.configure(globalConfig, universe);
+ }
+
+ private static class TransportClientBindingModule implements ExtensionModule {
+
+ private final TypeName transportClientType;
+
+ TransportClientBindingModule(TypeName transportClientType) {
+ this.transportClientType = transportClientType;
+ }
+
+ @Override
+ public void configure(Map<String, String> globalConfigurations, Binder binder) {
+ binder.bindExtension(transportClientType, new TestRequestReplyClientFactory());
+ }
+ }
+
+ private static class TestRequestReplyClientFactory implements RequestReplyClientFactory {
+ @Override
+ public RequestReplyClient createTransportClient(
+ ObjectNode transportProperties, URI endpointUrl) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cleanup() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml
new file mode 100644
index 0000000..8b595d4
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v3_1/module.yaml
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.1"
+
+module:
+ meta:
+ type: remote
+ spec:
+ endpoints:
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ functions: com.foo.bar/*
+ urlPathTemplate: http://bar.foo.com:8080/functions/{function.name}
+ maxNumBatchRequests: 10000
+ transport:
+ timeouts:
+ call: 1minutes
+ connect: 10seconds
+ read: 10second
+ write: 10seconds
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ functions: com.foo.bar.2/*
+ urlPathTemplate: http://2.bar.foo.com:8080/functions/{function.name}
+ transport:
+ type: my.custom/http.transport.type
+ property1: value1
+ property2:
+ - k1: v1
+ - k2: v2
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ functions: com.foo.bar/specific_function
+ urlPathTemplate: http://bar.foo.com:8080/functions/abc
+ - endpoint:
+ meta:
+ kind: http
+ spec:
+ functions: com.other.namespace/hello
+ urlPathTemplate: http://namespace.other.com:8080/hello
+ routers:
+ - router:
+ meta:
+ type: org.apache.flink.statefun.sdk/protobuf-router
+ spec:
+ ingress: com.mycomp.igal/names
+ target: "com.example/hello/{{$.name}}"
+ messageType: org.apache.flink.test.SimpleMessage
+ descriptorSet: classpath:test.desc
+ ingresses:
+ - ingress:
+ meta:
+ type: statefun.kafka.io/protobuf-ingress
+ id: com.mycomp.igal/names
+ spec:
+ address: kafka-broker:9092
+ topics:
+ - names
+ properties:
+ - consumer.group: greeter
+ messageType: org.apache.flink.test.SimpleMessage
+ descriptorSet: classpath:test.desc
+ egresses:
+ - egress:
+ meta:
+ type: io.statefun.kafka/egress
+ id: com.mycomp.foo/bar
+ spec:
+ address: kafka-broker:9092
+ deliverySemantic:
+ type: exactly-once
+ transactionTimeoutMillis: 100000
+ properties:
+ - foo.config: bar
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index d24636b..8cc2ec0 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -21,7 +21,11 @@
import java.net.URI;
import java.time.Duration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
+import org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.Target;
import org.apache.flink.statefun.flink.core.jsonmodule.FunctionEndpointSpec.UrlPathTemplate;
import org.apache.flink.statefun.sdk.FunctionType;
@@ -29,6 +33,9 @@
/** A Builder for RequestReply remote function type. */
public class RequestReplyFunctionBuilder {
+ private final DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec =
+ new DefaultHttpRequestReplyClientSpec.Timeouts();
+
/**
* Create a new builder for a remote function with a given type and an endpoint.
*
@@ -57,7 +64,7 @@
* @return this builder.
*/
public RequestReplyFunctionBuilder withMaxRequestDuration(Duration duration) {
- builder.withMaxRequestDuration(duration);
+ transportClientTimeoutsSpec.setCallTimeout(duration);
return this;
}
@@ -68,7 +75,7 @@
* @return this builder.
*/
public RequestReplyFunctionBuilder withConnectTimeout(Duration duration) {
- builder.withConnectTimeoutDuration(duration);
+ transportClientTimeoutsSpec.setConnectTimeout(duration);
return this;
}
@@ -79,7 +86,7 @@
* @return this builder.
*/
public RequestReplyFunctionBuilder withReadTimeout(Duration duration) {
- builder.withReadTimeoutDuration(duration);
+ transportClientTimeoutsSpec.setReadTimeout(duration);
return this;
}
@@ -90,7 +97,7 @@
* @return this builder.
*/
public RequestReplyFunctionBuilder withWriteTimeout(Duration duration) {
- builder.withWriteTimeoutDuration(duration);
+ transportClientTimeoutsSpec.setWriteTimeout(duration);
return this;
}
@@ -107,6 +114,18 @@
@Internal
HttpFunctionEndpointSpec spec() {
+ builder.withTransportClientFactoryType(TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE);
+ builder.withTransportClientProperties(
+ transportClientPropertiesAsObjectNode(transportClientTimeoutsSpec));
return builder.build();
}
+
+ private static ObjectNode transportClientPropertiesAsObjectNode(
+ DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec) {
+ final DefaultHttpRequestReplyClientSpec transportClientSpecPojo =
+ new DefaultHttpRequestReplyClientSpec();
+ transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec);
+
+ return new ObjectMapper().valueToTree(transportClientSpecPojo);
+ }
}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
index eed278c..38477d5 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableHttpFunctionProvider.java
@@ -24,10 +24,15 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientFactory;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
+import org.apache.flink.statefun.flink.core.httpfn.TransportClientConstants;
+import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClientFactory;
+import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.TypeName;
@NotThreadSafe
@Internal
@@ -45,8 +50,26 @@
@Override
public StatefulFunction functionOfType(FunctionType type) {
if (delegate == null) {
- delegate = new HttpFunctionProvider(supportedTypes, Collections.emptyMap());
+ delegate =
+ new HttpFunctionProvider(
+ supportedTypes, Collections.emptyMap(), new OkHttpTransportClientExtensionResolver());
}
return delegate.functionOfType(type);
}
+
+ private static class OkHttpTransportClientExtensionResolver implements ExtensionResolver {
+
+ private final DefaultHttpRequestReplyClientFactory defaultTransportClientFactory =
+ new DefaultHttpRequestReplyClientFactory();
+
+ @Override
+ public <T> T resolveExtension(TypeName typeName, Class<T> extensionClass) {
+ // the DataStream bridge SDK only supports using the default OkHttp request-reply client
+ if (!typeName.equals(TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE)
+ || extensionClass != RequestReplyClientFactory.class) {
+ throw new IllegalStateException("The DataStream SDK does not support extensions.");
+ }
+ return (T) defaultTransportClientFactory;
+ }
+ }
}
diff --git a/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java
new file mode 100644
index 0000000..5f84ffd
--- /dev/null
+++ b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/spi/ExtensionModule.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.spi;
+
+import java.util.Map;
+import org.apache.flink.statefun.sdk.TypeName;
+
+/**
+ * A module that binds multiple extension objects to the Stateful Functions application. Each
+ * extension is uniquely identified by a {@link TypeName}.
+ */
+public interface ExtensionModule {
+
+ /**
+ * This method binds multiple extension objects to the Stateful Functions application.
+ *
+ * @param globalConfigurations global configuration of the Stateful Functions application.
+ * @param binder binder for binding extensions.
+ */
+ void configure(Map<String, String> globalConfigurations, Binder binder);
+
+ interface Binder {
+ <T> void bindExtension(TypeName typeName, T extension);
+ }
+}