[FLINK-19001] Adds a statefun-flink-datastream module.
This module holds the builder classes, but it is here mainly for
packaging reasons. We need to provide a jar-with-dependencies,
and we can not use statefun-distribution since it also contains
the connectors that users don't need.
diff --git a/statefun-flink/pom.xml b/statefun-flink/pom.xml
index b400461..343018c 100644
--- a/statefun-flink/pom.xml
+++ b/statefun-flink/pom.xml
@@ -39,10 +39,10 @@
<module>statefun-flink-distribution</module>
<module>statefun-flink-harness</module>
<module>statefun-flink-state-processor</module>
+ <module>statefun-flink-datastream</module>
</modules>
<properties>
- <scala.binary.version>2.11</scala.binary.version>
<jsr305.version>3.0.2</jsr305.version>
<jmh.version>1.21</jmh.version>
<jsr305-version>1.3.9</jsr305-version>
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml b/statefun-flink/statefun-flink-datastream/pom.xml
new file mode 100644
index 0000000..b47b06e
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -0,0 +1,156 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink</artifactId>
+ <version>2.2-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>statefun-flink-datastream</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <!-- statefun-flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- The following dependencies are here with scope provided, because:
+ a) they are transitively required by the statefun-flink-* depencies
+ b) they are provided at runtime, by the embedding application.
+
+ Also note that org.slf4j:slf4j-api is excluded from all the artifacts, since maven
+ convergence plugging fails.
+ -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.kafka:*</artifact>
+ <excludes>
+ <exclude>kafka/kafka-version.properties</exclude>
+ <exclude>LICENSE</exclude>
+ <!-- Does not contain anything relevant.
+ Cites a binary dependency on jersey, but this is neither reflected in the
+ dependency graph, nor are any jersey files bundled. -->
+ <exclude>NOTICE</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <!-- required to aggregate all the META-INF/services files -->
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <!-- remove all duplicate licenses -->
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+ </transformer>
+ <!-- explicitly include our LICENSE file, located at project root dir -->
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/LICENSE</resource>
+ <file>${basedir}/../../LICENSE</file>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+ <projectName>Apache Flink Stateful Functions (flink-statefun)</projectName>
+ <encoding>UTF-8</encoding>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java
new file mode 100644
index 0000000..7fbf76f
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.io.Serializable;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+/** {@inheritDoc} */
+public interface SerializableStatefulFunctionProvider
+ extends StatefulFunctionProvider, Serializable {}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
new file mode 100644
index 0000000..48c9291
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.message.RoutableMessage;
+import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Builder for a Stateful Function Application.
+ *
+ * <p>This builder allows defining all the aspects of a stateful function application. define input
+ * streams as ingresses, define function providers and egress ids.
+ */
+public final class StatefulFunctionDataStreamBuilder {
+
+ public static StatefulFunctionDataStreamBuilder builder(String pipelineName) {
+ FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
+ return new StatefulFunctionDataStreamBuilder(key);
+ }
+
+ private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) {
+ this.feedbackKey = Objects.requireNonNull(feedbackKey);
+ }
+
+ private final FeedbackKey<Message> feedbackKey;
+ private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
+ private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
+ new HashMap<>();
+ private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>();
+
+ @Nullable private StatefulFunctionsConfig config;
+
+ /**
+ * Adds an ingress of incoming messages.
+ *
+ * @param ingress an incoming stream of messages.
+ * @return this builder.
+ */
+ public StatefulFunctionDataStreamBuilder withDataStreamAsIngress(
+ DataStream<RoutableMessage> ingress) {
+ Objects.requireNonNull(ingress);
+ definedIngresses.add(ingress);
+ return this;
+ }
+
+ /**
+ * Adds a function provider to this builder
+ *
+ * @param functionType the type of the function that this provider providers.
+ * @param provider the stateful function provider.
+ * @return this builder.
+ */
+ public StatefulFunctionDataStreamBuilder withFunctionProvider(
+ FunctionType functionType, SerializableStatefulFunctionProvider provider) {
+ Objects.requireNonNull(functionType);
+ Objects.requireNonNull(provider);
+ putAndThrowIfPresent(functionProviders, functionType, provider);
+ return this;
+ }
+
+ /**
+ * Registers an {@link EgressIdentifier}.
+ *
+ * <p>See {@link StatefulFunctionEgressStreams#getDataStreamForEgressId(EgressIdentifier)}.
+ *
+ * @param egressId an ingress id
+ * @return this builder.
+ */
+ public StatefulFunctionDataStreamBuilder withEgressId(EgressIdentifier<?> egressId) {
+ Objects.requireNonNull(egressId);
+ putAndThrowIfPresent(egressesIds, egressId);
+ return this;
+ }
+
+ /**
+ * Set a stateful function configuration.
+ *
+ * @param configuration the stateful function configuration to set.
+ * @return this builder.
+ */
+ public StatefulFunctionDataStreamBuilder withConfiguration(
+ StatefulFunctionsConfig configuration) {
+ Objects.requireNonNull(configuration);
+ this.config = configuration;
+ return this;
+ }
+
+ /**
+ * Adds Stateful Functions operators into the topology.
+ *
+ * @param env the stream execution environment.
+ */
+ public StatefulFunctionEgressStreams build(StreamExecutionEnvironment env) {
+ final StatefulFunctionsConfig config =
+ Optional.fromNullable(this.config).or(() -> StatefulFunctionsConfig.fromEnvironment(env));
+ EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey);
+ Map<EgressIdentifier<?>, DataStream<?>> sideOutputs =
+ embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders);
+ return new StatefulFunctionEgressStreams(sideOutputs);
+ }
+
+ private static <K, V> void putAndThrowIfPresent(Map<K, V> map, K key, V value) {
+ @Nullable V previous = map.put(key, value);
+ if (previous == null) {
+ return;
+ }
+ throw new IllegalStateException(
+ String.format("A binding for the key %s was previously defined.", key));
+ }
+
+ private static <K> void putAndThrowIfPresent(Set<K> set, K key) {
+ if (set.add(key)) {
+ return;
+ }
+ throw new IllegalStateException(
+ String.format("A binding for the key %s was previously defined.", key));
+ }
+}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java
new file mode 100644
index 0000000..e3182a3
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public final class StatefulFunctionEgressStreams {
+ private final Map<EgressIdentifier<?>, DataStream<?>> egresses;
+
+ StatefulFunctionEgressStreams(Map<EgressIdentifier<?>, DataStream<?>> egresses) {
+ this.egresses = Objects.requireNonNull(egresses);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataStream<T> getDataStreamForEgressId(EgressIdentifier<T> id) {
+ DataStream<?> dataStream = egresses.get(id);
+ if (dataStream == null) {
+ throw new IllegalArgumentException("Unknown data stream for ingress " + id);
+ }
+ return (DataStream<T>) dataStream;
+ }
+}