[FLINK-19256] [datastream] Improve DataStream API example
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index f850642..76ffde5 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.statefun.examples.datastream;
 
-import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
-import static org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER;
 import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
 
 import java.net.URI;
@@ -27,7 +25,6 @@
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.statefun.flink.core.message.RoutableMessage;
@@ -55,22 +52,12 @@
   public static void main(String... args) throws Exception {
 
     // -----------------------------------------------------------------------------------------
-    // set stateful function related configuration in flink-conf.yaml
-    // -----------------------------------------------------------------------------------------
-
-    Configuration configuration = new Configuration();
-    configuration.set(USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
-    configuration.set(
-        ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
-        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-
-    StatefulFunctionsConfig statefunConfig = new StatefulFunctionsConfig(configuration);
-
-    // -----------------------------------------------------------------------------------------
     // obtain the stream execution env and create some data streams
     // -----------------------------------------------------------------------------------------
 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+    statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
 
     DataStream<RoutableMessage> names =
         env.addSource(new NameSource())