[FLINK-24364][e2e] Add message cancellation embedded e2e case

This commit adds a message cancellation test case for the smoke e2e test.
The generation of message cancellation commands is disabled by default, and
only enabled for the embedded sdk e2e tests.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index 2f63a5d..ef33fd0 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -35,6 +35,7 @@
   private double stateModificationsPr = 0.4;
   private double sendPr = 0.9;
   private double sendAfterPr = 0.1;
+  private double sendAfterWithCancellationPr = 0.1;
   private double asyncSendPr = 0.1;
   private double noopPr = 0.2;
   private double sendEgressPr = 0.03;
@@ -42,6 +43,8 @@
   private String verificationServerHost = "localhost";
   private int verificationServerPort = 5050;
   private boolean isAsyncOpSupported = false;
+  private boolean isDelayCancellationOpSupported = false;
+
   private long randomGeneratorSeed = System.nanoTime();
 
   /** Creates an instance of ModuleParameters from a key-value map. */
@@ -108,6 +111,14 @@
     return sendAfterPr;
   }
 
+  public boolean isDelayCancellationOpSupported() {
+    return isDelayCancellationOpSupported;
+  }
+
+  public void setDelayCancellationOpSupported(boolean delayCancellationOpSupported) {
+    isDelayCancellationOpSupported = delayCancellationOpSupported;
+  }
+
   public void setSendAfterPr(double sendAfterPr) {
     this.sendAfterPr = sendAfterPr;
   }
@@ -160,6 +171,14 @@
     this.verificationServerPort = verificationServerPort;
   }
 
+  public double getSendAfterWithCancellationPr() {
+    return sendAfterWithCancellationPr;
+  }
+
+  public void setSendAfterWithCancellationPr(double sendAfterWithCancellationPr) {
+    this.sendAfterWithCancellationPr = sendAfterWithCancellationPr;
+  }
+
   public boolean isAsyncOpSupported() {
     return isAsyncOpSupported;
   }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
index e3912d1..8bcd4cc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
@@ -42,6 +42,11 @@
   message SendAfter {
     int32 target = 1;
     Commands commands = 2;
+    string cancellation_token = 3;
+  }
+  message CancelSendAfter {
+     int32 target = 1;
+     string cancellation_token = 2;
   }
   message SendEgress {
   }
@@ -60,6 +65,7 @@
     SendEgress send_egress = 4;
     AsyncOperation async_operation = 5;
     Verify verify = 6;
+    CancelSendAfter cancel_send_after = 7;
   }
 }
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
index fa399ee..607ac61 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.function.Supplier;
 import org.apache.commons.math3.distribution.EnumeratedDistribution;
 import org.apache.commons.math3.random.RandomGenerator;
@@ -85,9 +86,13 @@
                 create(new SendAfterGen(), parameters.getSendAfterPr()),
                 create(new Noop(), parameters.getNoopPr()),
                 create(new SendEgress(), parameters.getSendEgressPr())));
+
     if (parameters.isAsyncOpSupported()) {
       list.add(create(new SendAsyncOp(), parameters.getAsyncSendPr()));
     }
+    if (parameters.isDelayCancellationOpSupported()) {
+      list.add(create(new SendAfterCancellationGen(), parameters.getSendAfterWithCancellationPr()));
+    }
     return list;
   }
 
@@ -141,6 +146,23 @@
     }
   }
 
+  private final class SendAfterCancellationGen implements Gen {
+
+    @Override
+    public void generate(Commands.Builder builder, int depth) {
+      final String token = new UUID(random.nextLong(), random.nextLong()).toString();
+      final int address = address();
+
+      Command.SendAfter.Builder first =
+          Command.SendAfter.newBuilder().setTarget(address).setCancellationToken(token);
+      Command.CancelSendAfter.Builder second =
+          Command.CancelSendAfter.newBuilder().setTarget(address).setCancellationToken(token);
+
+      builder.addCommand(Command.newBuilder().setSendAfter(first));
+      builder.addCommand(Command.newBuilder().setCancelSendAfter(second));
+    }
+  }
+
   private final class SendGen implements Gen {
 
     @Override
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
index a4da76d..f184929 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
@@ -17,7 +17,13 @@
  */
 package org.apache.flink.statefun.e2e.smoke.embedded;
 
-import static org.apache.flink.statefun.e2e.smoke.driver.Types.*;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.COMMANDS_TYPE;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.SOURCE_COMMANDS_TYPE;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.isTypeOf;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packCommands;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packVerificationResult;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackCommands;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackSourceCommand;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -28,6 +34,7 @@
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
 import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
+import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.FunctionType;
@@ -81,6 +88,8 @@
         sendEgress(state, context, cmd.getSendEgress());
       } else if (cmd.hasVerify()) {
         verify(state, context, cmd.getVerify());
+      } else if (cmd.hasCancelSendAfter()) {
+        cancelSendAfter(state, context, cmd.getCancelSendAfter());
       }
     }
   }
@@ -114,7 +123,21 @@
       Command.SendAfter send) {
     FunctionType functionType = Constants.FN_TYPE;
     String id = ids.idOf(send.getTarget());
-    context.sendAfter(sendAfterDelay, functionType, id, packCommands(send.getCommands()));
+    TypedValue subCommands = packCommands(send.getCommands());
+    if (send.getCancellationToken().isEmpty()) {
+      context.sendAfter(sendAfterDelay, functionType, id, subCommands);
+    } else {
+      context.sendAfter(
+          sendAfterDelay, new Address(functionType, id), subCommands, send.getCancellationToken());
+    }
+  }
+
+  private void cancelSendAfter(
+      @SuppressWarnings("unused") PersistedValue<Long> state,
+      Context context,
+      Command.CancelSendAfter cancelSendAfter) {
+    String token = cancelSendAfter.getCancellationToken();
+    context.cancelDelayedMessage(token);
   }
 
   private void send(
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
index 925ebd1..6760418 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
@@ -61,6 +61,7 @@
     parameters.setVerificationServerHost("localhost");
     parameters.setVerificationServerPort(started.port());
     parameters.setAsyncOpSupported(true);
+    parameters.setDelayCancellationOpSupported(true);
     parameters.asMap().forEach(harness::withGlobalConfiguration);
 
     // run the harness.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
index cc7a0b4..63f3afb 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
@@ -34,6 +34,7 @@
     parameters.setMessageCount(100_000);
     parameters.setMaxFailures(1);
     parameters.setAsyncOpSupported(true);
+    parameters.setDelayCancellationOpSupported(true);
 
     StatefulFunctionsAppContainers.Builder builder =
         StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS);