[FLINK-21642] Test that retry batch on restore do not contain state value
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 548771c..f88916f 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -284,6 +284,26 @@
assertThat(context.functionTypeMetrics().numBacklog, is(0));
}
+ @Test
+ public void retryBatchOnUnkownAsyncResponseAfterRestore() {
+ TypedValue argument =
+ TypedValue.newBuilder()
+ .setTypename("io.statefun.foo/bar")
+ .setValue(ByteString.copyFromUtf8("Hello!"))
+ .build();
+ functionUnderTest.invoke(context, argument);
+ ToFunction originalRequest = client.wasSentToFunction;
+
+ RequestReplyFunction restoredFunction = new RequestReplyFunction(2, client);
+ restoredFunction.invoke(context, unknownAsyncOperation(originalRequest));
+
+ // retry batch after a restore on an unknown async operation should start with empty state specs
+ assertTrue(client.wasSentToFunction.hasInvocation());
+ assertThat(client.capturedInvocationBatchSize(), is(1));
+ assertThat(client.capturedInvocation(0).getArgument(), is(argument));
+ assertThat(client.capturedStateNames().size(), is(0));
+ }
+
private static PersistedRemoteFunctionValues testInitialRegisteredState(
String existingStateName, String typename) {
final PersistedRemoteFunctionValues states = new PersistedRemoteFunctionValues();
@@ -311,6 +331,12 @@
return new AsyncOperationResult<>(toFunction, Status.SUCCESS, fromFunction, null);
}
+ private static AsyncOperationResult<ToFunction, FromFunction> unknownAsyncOperation(
+ ToFunction toFunction) {
+ return new AsyncOperationResult<>(
+ toFunction, Status.UNKNOWN, FromFunction.getDefaultInstance(), null);
+ }
+
private static final class FakeClient implements RequestReplyClient {
ToFunction wasSentToFunction;
Supplier<FromFunction> fromFunction = FromFunction::getDefaultInstance;