[FLINK-19001] Adds a statefun-flink-datastream module.

This module holds the builder classes, but it is here mainly for
packaging reasons. We need to provide a jar-with-dependencies,
and we can not use statefun-distribution since it also contains
the connectors that users don't need.
diff --git a/statefun-flink/pom.xml b/statefun-flink/pom.xml
index b400461..343018c 100644
--- a/statefun-flink/pom.xml
+++ b/statefun-flink/pom.xml
@@ -39,10 +39,10 @@
         <module>statefun-flink-distribution</module>
         <module>statefun-flink-harness</module>
         <module>statefun-flink-state-processor</module>
+        <module>statefun-flink-datastream</module>
     </modules>
 
     <properties>
-        <scala.binary.version>2.11</scala.binary.version>
         <jsr305.version>3.0.2</jsr305.version>
         <jmh.version>1.21</jmh.version>
         <jsr305-version>1.3.9</jsr305-version>
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml b/statefun-flink/statefun-flink-datastream/pom.xml
new file mode 100644
index 0000000..b47b06e
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -0,0 +1,156 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>statefun-flink</artifactId>
+        <version>2.2-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>statefun-flink-datastream</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+        </dependency>
+
+        <!-- statefun-flink -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+  
+        <!-- The following dependencies are here with scope provided, because: 
+             a) they are transitively required by the statefun-flink-* depencies
+             b) they are provided at runtime, by the embedding application. 
+             
+             Also note that org.slf4j:slf4j-api is excluded from all the artifacts, since maven 
+             convergence plugging fails. 
+           -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+       </dependency>
+        
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.kafka:*</artifact>
+                                    <excludes>
+                                        <exclude>kafka/kafka-version.properties</exclude>
+                                        <exclude>LICENSE</exclude>
+                                        <!-- Does not contain anything relevant.
+                                            Cites a binary dependency on jersey, but this is neither reflected in the
+                                            dependency graph, nor are any jersey files bundled. -->
+                                        <exclude>NOTICE</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <!-- required to aggregate all the META-INF/services files -->
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <!-- remove all duplicate licenses -->
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+                                </transformer>
+                                <!-- explicitly include our LICENSE file, located at project root dir -->
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                    <resource>META-INF/LICENSE</resource>
+                                    <file>${basedir}/../../LICENSE</file>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <projectName>Apache Flink Stateful Functions (flink-statefun)</projectName>
+                                    <encoding>UTF-8</encoding>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java
new file mode 100644
index 0000000..7fbf76f
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/SerializableStatefulFunctionProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.io.Serializable;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+
+/** {@inheritDoc} */
+public interface SerializableStatefulFunctionProvider
+    extends StatefulFunctionProvider, Serializable {}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
new file mode 100644
index 0000000..48c9291
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.message.RoutableMessage;
+import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Builder for a Stateful Function Application.
+ *
+ * <p>This builder allows defining all the aspects of a stateful function application. define input
+ * streams as ingresses, define function providers and egress ids.
+ */
+public final class StatefulFunctionDataStreamBuilder {
+
+  public static StatefulFunctionDataStreamBuilder builder(String pipelineName) {
+    FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
+    return new StatefulFunctionDataStreamBuilder(key);
+  }
+
+  private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) {
+    this.feedbackKey = Objects.requireNonNull(feedbackKey);
+  }
+
+  private final FeedbackKey<Message> feedbackKey;
+  private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
+  private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
+      new HashMap<>();
+  private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>();
+
+  @Nullable private StatefulFunctionsConfig config;
+
+  /**
+   * Adds an ingress of incoming messages.
+   *
+   * @param ingress an incoming stream of messages.
+   * @return this builder.
+   */
+  public StatefulFunctionDataStreamBuilder withDataStreamAsIngress(
+      DataStream<RoutableMessage> ingress) {
+    Objects.requireNonNull(ingress);
+    definedIngresses.add(ingress);
+    return this;
+  }
+
+  /**
+   * Adds a function provider to this builder
+   *
+   * @param functionType the type of the function that this provider providers.
+   * @param provider the stateful function provider.
+   * @return this builder.
+   */
+  public StatefulFunctionDataStreamBuilder withFunctionProvider(
+      FunctionType functionType, SerializableStatefulFunctionProvider provider) {
+    Objects.requireNonNull(functionType);
+    Objects.requireNonNull(provider);
+    putAndThrowIfPresent(functionProviders, functionType, provider);
+    return this;
+  }
+
+  /**
+   * Registers an {@link EgressIdentifier}.
+   *
+   * <p>See {@link StatefulFunctionEgressStreams#getDataStreamForEgressId(EgressIdentifier)}.
+   *
+   * @param egressId an ingress id
+   * @return this builder.
+   */
+  public StatefulFunctionDataStreamBuilder withEgressId(EgressIdentifier<?> egressId) {
+    Objects.requireNonNull(egressId);
+    putAndThrowIfPresent(egressesIds, egressId);
+    return this;
+  }
+
+  /**
+   * Set a stateful function configuration.
+   *
+   * @param configuration the stateful function configuration to set.
+   * @return this builder.
+   */
+  public StatefulFunctionDataStreamBuilder withConfiguration(
+      StatefulFunctionsConfig configuration) {
+    Objects.requireNonNull(configuration);
+    this.config = configuration;
+    return this;
+  }
+
+  /**
+   * Adds Stateful Functions operators into the topology.
+   *
+   * @param env the stream execution environment.
+   */
+  public StatefulFunctionEgressStreams build(StreamExecutionEnvironment env) {
+    final StatefulFunctionsConfig config =
+        Optional.fromNullable(this.config).or(() -> StatefulFunctionsConfig.fromEnvironment(env));
+    EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey);
+    Map<EgressIdentifier<?>, DataStream<?>> sideOutputs =
+        embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders);
+    return new StatefulFunctionEgressStreams(sideOutputs);
+  }
+
+  private static <K, V> void putAndThrowIfPresent(Map<K, V> map, K key, V value) {
+    @Nullable V previous = map.put(key, value);
+    if (previous == null) {
+      return;
+    }
+    throw new IllegalStateException(
+        String.format("A binding for the key %s was previously defined.", key));
+  }
+
+  private static <K> void putAndThrowIfPresent(Set<K> set, K key) {
+    if (set.add(key)) {
+      return;
+    }
+    throw new IllegalStateException(
+        String.format("A binding for the key %s was previously defined.", key));
+  }
+}
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java
new file mode 100644
index 0000000..e3182a3
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionEgressStreams.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.datastream;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public final class StatefulFunctionEgressStreams {
+  private final Map<EgressIdentifier<?>, DataStream<?>> egresses;
+
+  StatefulFunctionEgressStreams(Map<EgressIdentifier<?>, DataStream<?>> egresses) {
+    this.egresses = Objects.requireNonNull(egresses);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> DataStream<T> getDataStreamForEgressId(EgressIdentifier<T> id) {
+    DataStream<?> dataStream = egresses.get(id);
+    if (dataStream == null) {
+      throw new IllegalArgumentException("Unknown data stream for ingress " + id);
+    }
+    return (DataStream<T>) dataStream;
+  }
+}