[FLINK-19001] Use the Stateful functions core translator
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index df61045..5c66d62 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -19,26 +19,13 @@
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.OptionalLong;
-import java.util.function.LongFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
-import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
-import org.apache.flink.statefun.flink.core.common.KeyBy;
-import org.apache.flink.statefun.flink.core.common.SerializableFunction;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
-import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
-import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
-import org.apache.flink.statefun.flink.core.functions.FunctionGroupDispatchFactory;
 import org.apache.flink.statefun.flink.core.message.Message;
-import org.apache.flink.statefun.flink.core.message.MessageKeySelector;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.OutputTag;
 
 public final class FlinkUniverse {
   private static final FeedbackKey<Message> FEEDBACK_KEY =
@@ -57,90 +44,11 @@
     Sources sources = Sources.create(env, universe, configuration);
     Sinks sinks = Sinks.create(universe);
 
-    SingleOutputStreamOperator<Message> feedbackUnionOperator =
-        feedbackUnionOperator(sources.unionStream());
+    StatefulFunctionTranslator translator =
+        new StatefulFunctionTranslator(FEEDBACK_KEY, configuration);
 
-    SingleOutputStreamOperator<Message> functionOutputStream =
-        functionOperator(feedbackUnionOperator, sinks.sideOutputTags());
+    Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = translator.translate(sources, sinks);
 
-    SingleOutputStreamOperator<Void> writeBackOut = feedbackOperator(functionOutputStream);
-
-    coLocate(feedbackUnionOperator, functionOutputStream, writeBackOut);
-
-    sinks.consumeFrom(functionOutputStream);
-  }
-
-  private SingleOutputStreamOperator<Message> feedbackUnionOperator(DataStream<Message> input) {
-    TypeInformation<Message> typeInfo = input.getType();
-
-    FeedbackUnionOperatorFactory<Message> factory =
-        new FeedbackUnionOperatorFactory<>(
-            configuration, FEEDBACK_KEY, new IsCheckpointBarrier(), new FeedbackKeySelector());
-
-    return input
-        .keyBy(new MessageKeySelector())
-        .transform(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_NAME, typeInfo, factory)
-        .uid(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_UID);
-  }
-
-  private SingleOutputStreamOperator<Message> functionOperator(
-      DataStream<Message> input, Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs) {
-
-    TypeInformation<Message> typeInfo = input.getType();
-
-    FunctionGroupDispatchFactory operatorFactory =
-        new FunctionGroupDispatchFactory(configuration, sideOutputs);
-
-    return DataStreamUtils.reinterpretAsKeyedStream(input, new MessageKeySelector())
-        .transform(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_NAME, typeInfo, operatorFactory)
-        .uid(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_UID);
-  }
-
-  private SingleOutputStreamOperator<Void> feedbackOperator(
-      SingleOutputStreamOperator<Message> functionOut) {
-
-    LongFunction<Message> toMessage = new CheckpointToMessage(universe.messageFactoryType());
-
-    FeedbackSinkOperator<Message> sinkOperator =
-        new FeedbackSinkOperator<>(FEEDBACK_KEY, toMessage);
-
-    return functionOut
-        .keyBy(new MessageKeySelector())
-        .transform(
-            StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_NAME,
-            TypeInformation.of(Void.class),
-            sinkOperator)
-        .uid(StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_UID);
-  }
-
-  private static void coLocate(DataStream<?> a, DataStream<?> b, DataStream<?> c) {
-    String stringKey = FEEDBACK_KEY.asColocationKey();
-    a.getTransformation().setCoLocationGroupKey(stringKey);
-    b.getTransformation().setCoLocationGroupKey(stringKey);
-    c.getTransformation().setCoLocationGroupKey(stringKey);
-
-    a.getTransformation().setParallelism(b.getParallelism());
-    c.getTransformation().setParallelism(b.getParallelism());
-  }
-
-  private static final class IsCheckpointBarrier
-      implements SerializableFunction<Message, OptionalLong> {
-
-    private static final long serialVersionUID = 1;
-
-    @Override
-    public OptionalLong apply(Message message) {
-      return message.isBarrierMessage();
-    }
-  }
-
-  private static final class FeedbackKeySelector implements SerializableFunction<Message, String> {
-
-    private static final long serialVersionUID = 1;
-
-    @Override
-    public String apply(Message message) {
-      return KeyBy.apply(message.target());
-    }
+    sinks.consumeFrom(sideOutputs);
   }
 }