[FLINK-24364][e2e] Add message cancellation embedded e2e case
This commit adds a message cancellation test case for the smoke e2e test.
The generation of message cancellation commands is disabled by default, and
only enabled for the embedded sdk e2e tests.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index 2f63a5d..ef33fd0 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -35,6 +35,7 @@
private double stateModificationsPr = 0.4;
private double sendPr = 0.9;
private double sendAfterPr = 0.1;
+ private double sendAfterWithCancellationPr = 0.1;
private double asyncSendPr = 0.1;
private double noopPr = 0.2;
private double sendEgressPr = 0.03;
@@ -42,6 +43,8 @@
private String verificationServerHost = "localhost";
private int verificationServerPort = 5050;
private boolean isAsyncOpSupported = false;
+ private boolean isDelayCancellationOpSupported = false;
+
private long randomGeneratorSeed = System.nanoTime();
/** Creates an instance of ModuleParameters from a key-value map. */
@@ -108,6 +111,14 @@
return sendAfterPr;
}
+ public boolean isDelayCancellationOpSupported() {
+ return isDelayCancellationOpSupported;
+ }
+
+ public void setDelayCancellationOpSupported(boolean delayCancellationOpSupported) {
+ isDelayCancellationOpSupported = delayCancellationOpSupported;
+ }
+
public void setSendAfterPr(double sendAfterPr) {
this.sendAfterPr = sendAfterPr;
}
@@ -160,6 +171,14 @@
this.verificationServerPort = verificationServerPort;
}
+ public double getSendAfterWithCancellationPr() {
+ return sendAfterWithCancellationPr;
+ }
+
+ public void setSendAfterWithCancellationPr(double sendAfterWithCancellationPr) {
+ this.sendAfterWithCancellationPr = sendAfterWithCancellationPr;
+ }
+
public boolean isAsyncOpSupported() {
return isAsyncOpSupported;
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
index e3912d1..8bcd4cc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
@@ -42,6 +42,11 @@
message SendAfter {
int32 target = 1;
Commands commands = 2;
+ string cancellation_token = 3;
+ }
+ message CancelSendAfter {
+ int32 target = 1;
+ string cancellation_token = 2;
}
message SendEgress {
}
@@ -60,6 +65,7 @@
SendEgress send_egress = 4;
AsyncOperation async_operation = 5;
Verify verify = 6;
+ CancelSendAfter cancel_send_after = 7;
}
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
index fa399ee..607ac61 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.UUID;
import java.util.function.Supplier;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.random.RandomGenerator;
@@ -85,9 +86,13 @@
create(new SendAfterGen(), parameters.getSendAfterPr()),
create(new Noop(), parameters.getNoopPr()),
create(new SendEgress(), parameters.getSendEgressPr())));
+
if (parameters.isAsyncOpSupported()) {
list.add(create(new SendAsyncOp(), parameters.getAsyncSendPr()));
}
+ if (parameters.isDelayCancellationOpSupported()) {
+ list.add(create(new SendAfterCancellationGen(), parameters.getSendAfterWithCancellationPr()));
+ }
return list;
}
@@ -141,6 +146,23 @@
}
}
+ private final class SendAfterCancellationGen implements Gen {
+
+ @Override
+ public void generate(Commands.Builder builder, int depth) {
+ final String token = new UUID(random.nextLong(), random.nextLong()).toString();
+ final int address = address();
+
+ Command.SendAfter.Builder first =
+ Command.SendAfter.newBuilder().setTarget(address).setCancellationToken(token);
+ Command.CancelSendAfter.Builder second =
+ Command.CancelSendAfter.newBuilder().setTarget(address).setCancellationToken(token);
+
+ builder.addCommand(Command.newBuilder().setSendAfter(first));
+ builder.addCommand(Command.newBuilder().setCancelSendAfter(second));
+ }
+ }
+
private final class SendGen implements Gen {
@Override
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
index a4da76d..f184929 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
@@ -17,7 +17,13 @@
*/
package org.apache.flink.statefun.e2e.smoke.embedded;
-import static org.apache.flink.statefun.e2e.smoke.driver.Types.*;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.COMMANDS_TYPE;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.SOURCE_COMMANDS_TYPE;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.isTypeOf;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packCommands;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packVerificationResult;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackCommands;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackSourceCommand;
import java.time.Duration;
import java.util.Objects;
@@ -28,6 +34,7 @@
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
@@ -81,6 +88,8 @@
sendEgress(state, context, cmd.getSendEgress());
} else if (cmd.hasVerify()) {
verify(state, context, cmd.getVerify());
+ } else if (cmd.hasCancelSendAfter()) {
+ cancelSendAfter(state, context, cmd.getCancelSendAfter());
}
}
}
@@ -114,7 +123,21 @@
Command.SendAfter send) {
FunctionType functionType = Constants.FN_TYPE;
String id = ids.idOf(send.getTarget());
- context.sendAfter(sendAfterDelay, functionType, id, packCommands(send.getCommands()));
+ TypedValue subCommands = packCommands(send.getCommands());
+ if (send.getCancellationToken().isEmpty()) {
+ context.sendAfter(sendAfterDelay, functionType, id, subCommands);
+ } else {
+ context.sendAfter(
+ sendAfterDelay, new Address(functionType, id), subCommands, send.getCancellationToken());
+ }
+ }
+
+ private void cancelSendAfter(
+ @SuppressWarnings("unused") PersistedValue<Long> state,
+ Context context,
+ Command.CancelSendAfter cancelSendAfter) {
+ String token = cancelSendAfter.getCancellationToken();
+ context.cancelDelayedMessage(token);
}
private void send(
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
index 925ebd1..6760418 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
@@ -61,6 +61,7 @@
parameters.setVerificationServerHost("localhost");
parameters.setVerificationServerPort(started.port());
parameters.setAsyncOpSupported(true);
+ parameters.setDelayCancellationOpSupported(true);
parameters.asMap().forEach(harness::withGlobalConfiguration);
// run the harness.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
index cc7a0b4..63f3afb 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
@@ -34,6 +34,7 @@
parameters.setMessageCount(100_000);
parameters.setMaxFailures(1);
parameters.setAsyncOpSupported(true);
+ parameters.setDelayCancellationOpSupported(true);
StatefulFunctionsAppContainers.Builder builder =
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS);