[FLINK-19256] [datastream] Improve DataStream API example
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index f850642..76ffde5 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -18,8 +18,6 @@
package org.apache.flink.statefun.examples.datastream;
-import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
-import static org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER;
import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
import java.net.URI;
@@ -27,7 +25,6 @@
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
@@ -55,22 +52,12 @@
public static void main(String... args) throws Exception {
// -----------------------------------------------------------------------------------------
- // set stateful function related configuration in flink-conf.yaml
- // -----------------------------------------------------------------------------------------
-
- Configuration configuration = new Configuration();
- configuration.set(USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
- configuration.set(
- ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
- "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-
- StatefulFunctionsConfig statefunConfig = new StatefulFunctionsConfig(configuration);
-
- // -----------------------------------------------------------------------------------------
// obtain the stream execution env and create some data streams
// -----------------------------------------------------------------------------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+ statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
DataStream<RoutableMessage> names =
env.addSource(new NameSource())