[FLINK-21642] Test that retry batch on restore do not contain state value
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 548771c..f88916f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -284,6 +284,26 @@
     assertThat(context.functionTypeMetrics().numBacklog, is(0));
   }
 
+  @Test
+  public void retryBatchOnUnkownAsyncResponseAfterRestore() {
+    TypedValue argument =
+        TypedValue.newBuilder()
+            .setTypename("io.statefun.foo/bar")
+            .setValue(ByteString.copyFromUtf8("Hello!"))
+            .build();
+    functionUnderTest.invoke(context, argument);
+    ToFunction originalRequest = client.wasSentToFunction;
+
+    RequestReplyFunction restoredFunction = new RequestReplyFunction(2, client);
+    restoredFunction.invoke(context, unknownAsyncOperation(originalRequest));
+
+    // retry batch after a restore on an unknown async operation should start with empty state specs
+    assertTrue(client.wasSentToFunction.hasInvocation());
+    assertThat(client.capturedInvocationBatchSize(), is(1));
+    assertThat(client.capturedInvocation(0).getArgument(), is(argument));
+    assertThat(client.capturedStateNames().size(), is(0));
+  }
+
   private static PersistedRemoteFunctionValues testInitialRegisteredState(
       String existingStateName, String typename) {
     final PersistedRemoteFunctionValues states = new PersistedRemoteFunctionValues();
@@ -311,6 +331,12 @@
     return new AsyncOperationResult<>(toFunction, Status.SUCCESS, fromFunction, null);
   }
 
+  private static AsyncOperationResult<ToFunction, FromFunction> unknownAsyncOperation(
+      ToFunction toFunction) {
+    return new AsyncOperationResult<>(
+        toFunction, Status.UNKNOWN, FromFunction.getDefaultInstance(), null);
+  }
+
   private static final class FakeClient implements RequestReplyClient {
     ToFunction wasSentToFunction;
     Supplier<FromFunction> fromFunction = FromFunction::getDefaultInstance;