[FLINK-19001] Add an example for data stream embedding
diff --git a/statefun-examples/pom.xml b/statefun-examples/pom.xml
index 2030dea..1f1547b 100644
--- a/statefun-examples/pom.xml
+++ b/statefun-examples/pom.xml
@@ -37,6 +37,7 @@
         <module>statefun-shopping-cart-example</module>
         <module>statefun-async-example</module>
         <module>statefun-state-processor-example</module>
+        <module>statefun-flink-datastream-example</module>
     </modules>
 
 </project>
diff --git a/statefun-examples/statefun-flink-datastream-example/pom.xml b/statefun-examples/statefun-flink-datastream-example/pom.xml
new file mode 100644
index 0000000..edd09ca
--- /dev/null
+++ b/statefun-examples/statefun-flink-datastream-example/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-examples</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.2-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-flink-datastream-example</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-datastream</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <exclusions>
+                <!-- The following exclusion is needed since this artifacts pulls two different versions
+                     of slf4j, and thus failing the maven convergence plugging.
+                 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
+        </dependency>
+
+
+        <!-- test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
new file mode 100644
index 0000000..1c0969f
--- /dev/null
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.examples.datastream;
+
+import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER;
+
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.statefun.flink.core.message.RoutableMessage;
+import org.apache.flink.statefun.flink.core.message.RoutableMessageBuilder;
+import org.apache.flink.statefun.flink.datastream.StatefulFunctionDataStreamBuilder;
+import org.apache.flink.statefun.flink.datastream.StatefulFunctionEgressStreams;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public class Example {
+
+  private static final FunctionType GREET = new FunctionType("exmaple", "greet");
+  private static final EgressIdentifier<String> GREETINGS =
+      new EgressIdentifier<>("example", "out", String.class);
+
+  public static void main(String... args) throws Exception {
+
+    // -----------------------------------------------------------------------------------------
+    // set stateful function related configuration in flink-conf.yaml
+    // -----------------------------------------------------------------------------------------
+
+    Configuration configuration = new Configuration();
+    configuration.set(USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
+    configuration.set(
+        ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+
+    StatefulFunctionsConfig statefunConfig = new StatefulFunctionsConfig(configuration);
+
+    // -----------------------------------------------------------------------------------------
+    // obtain the stream execution env and create some data streams
+    // -----------------------------------------------------------------------------------------
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<RoutableMessage> names =
+        env.addSource(new NameSource())
+            .map(
+                name ->
+                    RoutableMessageBuilder.builder()
+                        .withTargetAddress(GREET, name)
+                        .withMessageBody(name)
+                        .build());
+
+    // -----------------------------------------------------------------------------------------
+    // wire up stateful functions
+    // -----------------------------------------------------------------------------------------
+
+    StatefulFunctionEgressStreams out =
+        StatefulFunctionDataStreamBuilder.builder("example")
+            .withDataStreamAsIngress(names)
+            .withFunctionProvider(GREET, unused -> new MyFunction())
+            .withEgressId(GREETINGS)
+            .withConfiguration(statefunConfig)
+            .build(env);
+
+    // -----------------------------------------------------------------------------------------
+    // obtain the outputs
+    // -----------------------------------------------------------------------------------------
+
+    DataStream<String> output = out.getDataStreamForEgressId(GREETINGS);
+
+    // -----------------------------------------------------------------------------------------
+    // the rest of the pipeline
+    // -----------------------------------------------------------------------------------------
+
+    output
+        .map(
+            new RichMapFunction<String, String>() {
+              @Override
+              public String map(String value) {
+                return "'" + value + "'";
+              }
+            })
+        .addSink(new PrintSinkFunction<>());
+
+    env.execute();
+  }
+
+  private static final class MyFunction implements StatefulFunction {
+
+    @Persisted
+    private final PersistedValue<Integer> seenCount = PersistedValue.of("seen", Integer.class);
+
+    @Override
+    public void invoke(Context context, Object input) {
+      int seen = seenCount.updateAndGet(MyFunction::increment);
+      context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, seen));
+    }
+
+    private static int increment(@Nullable Integer n) {
+      return n == null ? 1 : n + 1;
+    }
+  }
+
+  private static final class NameSource implements SourceFunction<String> {
+
+    private static final long serialVersionUID = 1;
+
+    private volatile boolean canceled;
+
+    @Override
+    public void run(SourceContext<String> ctx) throws InterruptedException {
+      String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
+      ThreadLocalRandom random = ThreadLocalRandom.current();
+      while (true) {
+        int index = random.nextInt(names.length);
+        final String name = names[index];
+        synchronized (ctx.getCheckpointLock()) {
+          if (canceled) {
+            return;
+          }
+          ctx.collect(name);
+        }
+        Thread.sleep(1000);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      canceled = true;
+    }
+  }
+}