[FLINK-20699] Set FeedbackKey invocation_id explicitly

This closes #190.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 5f56786..ac2065e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -21,15 +21,20 @@
 import java.net.URLClassLoader;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
+import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.FlinkUserCodeClassLoader;
 
 public class StatefulFunctionsJob {
 
+  private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
+
   public static void main(String... args) throws Exception {
     ParameterTool parameterTool = ParameterTool.fromArgs(args);
     Map<String, String> globalConfigurations = parameterTool.toMap();
@@ -70,7 +75,10 @@
         new StatefulFunctionsUniverseValidator();
     statefulFunctionsUniverseValidator.validate(statefulFunctionsUniverse);
 
-    FlinkUniverse flinkUniverse = new FlinkUniverse(statefulFunctionsUniverse, stateFunConfig);
+    FeedbackKey<Message> feedbackKey =
+        new FeedbackKey<>("statefun-pipeline", FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+    FlinkUniverse flinkUniverse =
+        new FlinkUniverse(feedbackKey, stateFunConfig, statefulFunctionsUniverse);
     flinkUniverse.configure(env);
 
     env.execute(stateFunConfig.getFlinkJobName());
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
index caff5ee..ef6ad58 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackKey.java
@@ -34,7 +34,7 @@
   }
 
   public SubtaskFeedbackKey<V> withSubTaskIndex(int subTaskIndex, int attemptId) {
-    return new SubtaskFeedbackKey<>(pipelineName, invocationId, attemptId, subTaskIndex);
+    return new SubtaskFeedbackKey<>(pipelineName, invocationId, subTaskIndex, attemptId);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
index 5c66d62..785b99e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java
@@ -28,14 +28,17 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public final class FlinkUniverse {
-  private static final FeedbackKey<Message> FEEDBACK_KEY =
-      new FeedbackKey<>("statefun-pipeline", 1);
 
   private final StatefulFunctionsUniverse universe;
 
   private final StatefulFunctionsConfig configuration;
+  private final FeedbackKey<Message> feedbackKey;
 
-  public FlinkUniverse(StatefulFunctionsUniverse universe, StatefulFunctionsConfig configuration) {
+  public FlinkUniverse(
+      FeedbackKey<Message> feedbackKey,
+      StatefulFunctionsConfig configuration,
+      StatefulFunctionsUniverse universe) {
+    this.feedbackKey = Objects.requireNonNull(feedbackKey);
     this.universe = Objects.requireNonNull(universe);
     this.configuration = Objects.requireNonNull(configuration);
   }
@@ -45,7 +48,7 @@
     Sinks sinks = Sinks.create(universe);
 
     StatefulFunctionTranslator translator =
-        new StatefulFunctionTranslator(FEEDBACK_KEY, configuration);
+        new StatefulFunctionTranslator(feedbackKey, configuration);
 
     Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = translator.translate(sources, sinks);
 
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
index 58dbb92..60d3ab9 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
@@ -46,17 +47,18 @@
  */
 public final class StatefulFunctionDataStreamBuilder {
 
+  private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
+
   /** Creates a {@code StatefulFunctionDataStreamBuilder}. */
   public static StatefulFunctionDataStreamBuilder builder(String pipelineName) {
-    FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
-    return new StatefulFunctionDataStreamBuilder(key);
+    return new StatefulFunctionDataStreamBuilder(pipelineName);
   }
 
-  private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) {
-    this.feedbackKey = Objects.requireNonNull(feedbackKey);
+  private StatefulFunctionDataStreamBuilder(String pipelineName) {
+    this.pipelineName = Objects.requireNonNull(pipelineName);
   }
 
-  private final FeedbackKey<Message> feedbackKey;
+  private final String pipelineName;
   private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>();
   private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders =
       new HashMap<>();
@@ -148,7 +150,9 @@
     requestReplyFunctions.forEach(
         (type, unused) -> functionProviders.put(type, httpFunctionProvider));
 
-    EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey);
+    FeedbackKey<Message> key =
+        new FeedbackKey<>(pipelineName, FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet());
+    EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, key);
     Map<EgressIdentifier<?>, DataStream<?>> sideOutputs =
         embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders);
     return new StatefulFunctionEgressStreams(sideOutputs);