[FLINK-19001] Add an embedded translator
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/EmbeddedTranslator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/EmbeddedTranslator.java
new file mode 100644
index 0000000..921b33b
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/EmbeddedTranslator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.translation;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
+import org.apache.flink.statefun.flink.core.common.Maps;
+import org.apache.flink.statefun.flink.core.datastream.SerializableStatefulFunctionProvider;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.message.RoutableMessage;
+import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class EmbeddedTranslator {
+ private final StatefulFunctionsConfig configuration;
+ private final FeedbackKey<Message> feedbackKey;
+
+ public EmbeddedTranslator(StatefulFunctionsConfig config, FeedbackKey<Message> feedbackKey) {
+ this.configuration = config;
+ this.feedbackKey = feedbackKey;
+ }
+
+ public Map<EgressIdentifier<?>, DataStream<?>> translate(
+ List<DataStream<RoutableMessage>> ingresses,
+ Iterable<EgressIdentifier<?>> egressesIds,
+ Map<FunctionType, SerializableStatefulFunctionProvider> functions) {
+
+ configuration.setProvider(new EmbeddedUniverseProvider(functions));
+
+ StaticallyRegisteredTypes types = new StaticallyRegisteredTypes(configuration.getFactoryType());
+ Sources sources = Sources.create(types, ingresses);
+ Sinks sinks = Sinks.create(types, egressesIds);
+
+ StatefulFunctionTranslator translator =
+ new StatefulFunctionTranslator(feedbackKey, configuration);
+
+ return translator.translate(sources, sinks);
+ }
+
+ private static class EmbeddedUniverseProvider implements StatefulFunctionsUniverseProvider {
+
+ private static final class SerializableFunctionType implements Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ String namespace;
+ String name;
+
+ static SerializableFunctionType fromSdk(FunctionType functionType) {
+ SerializableFunctionType t = new SerializableFunctionType();
+ t.namespace = functionType.namespace();
+ t.name = functionType.name();
+ return t;
+ }
+
+ static FunctionType toSdk(SerializableFunctionType type) {
+ return new FunctionType(type.namespace, type.name);
+ }
+ }
+
+ private Map<SerializableFunctionType, SerializableStatefulFunctionProvider> functions;
+
+ public EmbeddedUniverseProvider(
+ Map<FunctionType, SerializableStatefulFunctionProvider> functions) {
+ this.functions = Maps.transformKeys(functions, SerializableFunctionType::fromSdk);
+ }
+
+ @Override
+ public StatefulFunctionsUniverse get(
+ ClassLoader classLoader, StatefulFunctionsConfig configuration) {
+
+ StatefulFunctionsUniverse u = new StatefulFunctionsUniverse(configuration.getFactoryType());
+
+ functions.forEach(
+ (type, fn) -> u.bindFunctionProvider(SerializableFunctionType.toSdk(type), fn));
+
+ return u;
+ }
+ }
+}