[FLINK-19001] Use the Stateful functions core translator
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index df61045..5c66d62 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -19,26 +19,13 @@
import java.util.Map;
import java.util.Objects;
-import java.util.OptionalLong;
-import java.util.function.LongFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
-import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
-import org.apache.flink.statefun.flink.core.common.KeyBy;
-import org.apache.flink.statefun.flink.core.common.SerializableFunction;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
-import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
-import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
-import org.apache.flink.statefun.flink.core.functions.FunctionGroupDispatchFactory;
import org.apache.flink.statefun.flink.core.message.Message;
-import org.apache.flink.statefun.flink.core.message.MessageKeySelector;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.OutputTag;
public final class FlinkUniverse {
private static final FeedbackKey<Message> FEEDBACK_KEY =
@@ -57,90 +44,11 @@
Sources sources = Sources.create(env, universe, configuration);
Sinks sinks = Sinks.create(universe);
- SingleOutputStreamOperator<Message> feedbackUnionOperator =
- feedbackUnionOperator(sources.unionStream());
+ StatefulFunctionTranslator translator =
+ new StatefulFunctionTranslator(FEEDBACK_KEY, configuration);
- SingleOutputStreamOperator<Message> functionOutputStream =
- functionOperator(feedbackUnionOperator, sinks.sideOutputTags());
+ Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = translator.translate(sources, sinks);
- SingleOutputStreamOperator<Void> writeBackOut = feedbackOperator(functionOutputStream);
-
- coLocate(feedbackUnionOperator, functionOutputStream, writeBackOut);
-
- sinks.consumeFrom(functionOutputStream);
- }
-
- private SingleOutputStreamOperator<Message> feedbackUnionOperator(DataStream<Message> input) {
- TypeInformation<Message> typeInfo = input.getType();
-
- FeedbackUnionOperatorFactory<Message> factory =
- new FeedbackUnionOperatorFactory<>(
- configuration, FEEDBACK_KEY, new IsCheckpointBarrier(), new FeedbackKeySelector());
-
- return input
- .keyBy(new MessageKeySelector())
- .transform(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_NAME, typeInfo, factory)
- .uid(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_UID);
- }
-
- private SingleOutputStreamOperator<Message> functionOperator(
- DataStream<Message> input, Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs) {
-
- TypeInformation<Message> typeInfo = input.getType();
-
- FunctionGroupDispatchFactory operatorFactory =
- new FunctionGroupDispatchFactory(configuration, sideOutputs);
-
- return DataStreamUtils.reinterpretAsKeyedStream(input, new MessageKeySelector())
- .transform(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_NAME, typeInfo, operatorFactory)
- .uid(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_UID);
- }
-
- private SingleOutputStreamOperator<Void> feedbackOperator(
- SingleOutputStreamOperator<Message> functionOut) {
-
- LongFunction<Message> toMessage = new CheckpointToMessage(universe.messageFactoryType());
-
- FeedbackSinkOperator<Message> sinkOperator =
- new FeedbackSinkOperator<>(FEEDBACK_KEY, toMessage);
-
- return functionOut
- .keyBy(new MessageKeySelector())
- .transform(
- StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_NAME,
- TypeInformation.of(Void.class),
- sinkOperator)
- .uid(StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_UID);
- }
-
- private static void coLocate(DataStream<?> a, DataStream<?> b, DataStream<?> c) {
- String stringKey = FEEDBACK_KEY.asColocationKey();
- a.getTransformation().setCoLocationGroupKey(stringKey);
- b.getTransformation().setCoLocationGroupKey(stringKey);
- c.getTransformation().setCoLocationGroupKey(stringKey);
-
- a.getTransformation().setParallelism(b.getParallelism());
- c.getTransformation().setParallelism(b.getParallelism());
- }
-
- private static final class IsCheckpointBarrier
- implements SerializableFunction<Message, OptionalLong> {
-
- private static final long serialVersionUID = 1;
-
- @Override
- public OptionalLong apply(Message message) {
- return message.isBarrierMessage();
- }
- }
-
- private static final class FeedbackKeySelector implements SerializableFunction<Message, String> {
-
- private static final long serialVersionUID = 1;
-
- @Override
- public String apply(Message message) {
- return KeyBy.apply(message.target());
- }
+ sinks.consumeFrom(sideOutputs);
}
}