[FLINK-20699] Set FeedbackKey invocation_id explicitly
This closes #190.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 5f56786..ac2065e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -21,15 +21,20 @@
import java.net.URLClassLoader;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FlinkUserCodeClassLoader;
public class StatefulFunctionsJob {
+ private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
+
public static void main(String... args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Map<String, String> globalConfigurations = parameterTool.toMap();
@@ -70,7 +75,10 @@
new StatefulFunctionsUniverseValidator();
statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
- FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
+ FeedbackKey<Message> feedbackKey =
+ new FeedbackKey<>("statefun-pipeline", FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+ FlinkUniverse flinkUniverse =
+ new FlinkUniverse(feedbackKey, stateFunConfig, statefulFunctionsUniverse);
flinkUniverse.configure(env);
env.execute(stateFunConfig.getFlinkJobName());
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
index caff5ee..ef6ad58 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
@@ -34,7 +34,7 @@
}
public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int attemptId) {
- return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, subTaskIndex);
+ return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex, attemptId);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index 5c66d62..785b99e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -28,14 +28,17 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public final class FlinkUniverse {
- private static final FeedbackKey<Message> FEEDBACK_KEY =
- new FeedbackKey<>("statefun-pipeline", 1);
private final StatefulFunctionsUniverse universe;
private final StatefulFunctionsConfig configuration;
+ private final FeedbackKey<Message> feedbackKey;
- public FlinkUniverse(StatefulFunctionsUniverse universe, StatefulFunctionsConfig configuration) {
+ public FlinkUniverse(
+ FeedbackKey<Message> feedbackKey,
+ StatefulFunctionsConfig configuration,
+ StatefulFunctionsUniverse universe) {
+ this.feedbackKey = Objects.requireNonNull(feedbackKey);
this.universe = Objects.requireNonNull(universe);
this.configuration = Objects.requireNonNull(configuration);
}
@@ -45,7 +48,7 @@
Sinks sinks = Sinks.create(universe);
StatefulFunctionTranslator translator =
- new StatefulFunctionTranslator(FEEDBACK_KEY, configuration);
+ new StatefulFunctionTranslator(feedbackKey, configuration);
Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = translator.translate(sources, sinks);
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
index 58dbb92..60d3ab9 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -46,17 +47,18 @@
*/
public final class StatefulFunctionDataStreamBuilder {
+ private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
+
/** Creates a {@code StatefulFunctionDataStreamBuilder}. */
public static StatefulFunctionDataStreamBuilder builder(String pipelineName) {
- FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
- return new StatefulFunctionDataStreamBuilder(key);
+ return new StatefulFunctionDataStreamBuilder(pipelineName);
}
- private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) {
- this.feedbackKey = Objects.requireNonNull(feedbackKey);
+ private StatefulFunctionDataStreamBuilder(String pipelineName) {
+ this.pipelineName = Objects.requireNonNull(pipelineName);
}
- private final FeedbackKey<Message> feedbackKey;
+ private final String pipelineName;
private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
new HashMap<>();
@@ -148,7 +150,9 @@
requestReplyFunctions.forEach(
(type, unused) -> functionProviders.put(type, httpFunctionProvider));
- EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey);
+ FeedbackKey<Message> key =
+ new FeedbackKey<>(pipelineName, FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+ EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, key);
Map<EgressIdentifier<?>, DataStream<?>> sideOutputs =
embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders);
return new StatefulFunctionEgressStreams(sideOutputs);