[FLINK-35528][task] Skip execution of interruptible mails when yielding

When operators are yielding, for example waiting for async state access to complete before a checkpoint, it would be beneficial to not execute interruptible mails. Otherwise continuation mail for firing timers would be continuously re-enqeueed. To achieve that MailboxExecutor must be aware which mails are interruptible.

The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
new file mode 100644
index 0000000..3bb5b77
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+    static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false);
+    static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(true);
+
+    private final boolean deferrable;
+
+    private MailOptionsImpl(boolean deferrable) {
+        this.deferrable = deferrable;
+    }
+
+    public boolean isDeferrable() {
+        return deferrable;
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
index 639ed18..79e1793 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
@@ -86,6 +86,26 @@
     /** A constant for empty args to save on object allocation. */
     Object[] EMPTY_ARGS = new Object[0];
 
+    /** Extra options to configure enqueued mails. */
+    @PublicEvolving
+    interface MailOptions {
+        static MailOptions options() {
+            return MailOptionsImpl.DEFAULT;
+        }
+
+        /**
+         * Mark this mail as deferrable.
+         *
+         * <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
+         * subtask thread as quickly as possible, deferrable mails are not executed during {@link
+         * #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
+         * execution of potentially long-running mails.
+         */
+        static MailOptions deferrable() {
+            return MailOptionsImpl.DEFERRABLE;
+        }
+    }
+
     /**
      * Executes the given command at some time in the future in the mailbox thread.
      *
@@ -110,6 +130,49 @@
      * The description may contain placeholder that refer to the provided description arguments
      * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
      *
+     * @param mailOptions additional options to configure behaviour of the {@code command}
+     * @param command the runnable task to add to the mailbox for execution.
+     * @param description the optional description for the command that is used for debugging and
+     *     error-reporting.
+     * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
+     *     because the mailbox is quiesced or closed.
+     */
+    default void execute(
+            MailOptions mailOptions,
+            ThrowingRunnable<? extends Exception> command,
+            String description) {
+        execute(mailOptions, command, description, EMPTY_ARGS);
+    }
+
+    /**
+     * Executes the given command at some time in the future in the mailbox thread.
+     *
+     * <p>An optional description can (and should) be added to ease debugging and error-reporting.
+     * The description may contain placeholder that refer to the provided description arguments
+     * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
+     *
+     * @param command the runnable task to add to the mailbox for execution.
+     * @param descriptionFormat the optional description for the command that is used for debugging
+     *     and error-reporting.
+     * @param descriptionArgs the parameters used to format the final description string.
+     * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
+     *     because the mailbox is quiesced or closed.
+     */
+    default void execute(
+            ThrowingRunnable<? extends Exception> command,
+            String descriptionFormat,
+            Object... descriptionArgs) {
+        execute(MailOptions.options(), command, descriptionFormat, descriptionArgs);
+    }
+
+    /**
+     * Executes the given command at some time in the future in the mailbox thread.
+     *
+     * <p>An optional description can (and should) be added to ease debugging and error-reporting.
+     * The description may contain placeholder that refer to the provided description arguments
+     * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
+     *
+     * @param mailOptions additional options to configure behaviour of the {@code command}
      * @param command the runnable task to add to the mailbox for execution.
      * @param descriptionFormat the optional description for the command that is used for debugging
      *     and error-reporting.
@@ -118,6 +181,7 @@
      *     because the mailbox is quiesced or closed.
      */
     void execute(
+            MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> command,
             String descriptionFormat,
             Object... descriptionArgs);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index f99dd87..e98a66b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -824,6 +824,7 @@
 
         @Override
         public void execute(
+                MailOptions mailOptions,
                 ThrowingRunnable<? extends Exception> command,
                 String descriptionFormat,
                 Object... descriptionArgs) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
index 13ab803..5b86284 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
@@ -25,6 +25,7 @@
 public class SyncMailboxExecutor implements MailboxExecutor {
     @Override
     public void execute(
+            MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> command,
             String descriptionFormat,
             Object... descriptionArgs) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
index 02cfd96..76eabe6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
@@ -77,6 +77,7 @@
             progressWatermarkScheduled = true;
             // We still have work to do, but we need to let other mails to be processed first.
             mailboxExecutor.execute(
+                    MailboxExecutor.MailOptions.deferrable(),
                     () -> {
                         progressWatermarkScheduled = false;
                         emitWatermarkInsideMailbox();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
index bed96ae..6afd891 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailOptionsImpl;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -30,6 +32,7 @@
  */
 @Internal
 public class Mail {
+    private final MailOptionsImpl mailOptions;
     /** The action to execute. */
     private final ThrowingRunnable<? extends Exception> runnable;
     /**
@@ -50,6 +53,7 @@
             String descriptionFormat,
             Object... descriptionArgs) {
         this(
+                MailboxExecutor.MailOptions.options(),
                 runnable,
                 priority,
                 StreamTaskActionExecutor.IMMEDIATE,
@@ -58,11 +62,13 @@
     }
 
     public Mail(
+            MailboxExecutor.MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> runnable,
             int priority,
             StreamTaskActionExecutor actionExecutor,
             String descriptionFormat,
             Object... descriptionArgs) {
+        this.mailOptions = (MailOptionsImpl) mailOptions;
         this.runnable = Preconditions.checkNotNull(runnable);
         this.priority = priority;
         this.descriptionFormat =
@@ -71,8 +77,13 @@
         this.actionExecutor = actionExecutor;
     }
 
+    public MailboxExecutor.MailOptions getMailOptions() {
+        return mailOptions;
+    }
+
     public int getPriority() {
-        return priority;
+        /** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
+        return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority;
     }
 
     public void tryCancel(boolean mayInterruptIfRunning) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index b2c8eac..2987ee2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -67,13 +67,19 @@
 
     @Override
     public void execute(
+            MailOptions mailOptions,
             final ThrowingRunnable<? extends Exception> command,
             final String descriptionFormat,
             final Object... descriptionArgs) {
         try {
             mailbox.put(
                     new Mail(
-                            command, priority, actionExecutor, descriptionFormat, descriptionArgs));
+                            mailOptions,
+                            command,
+                            priority,
+                            actionExecutor,
+                            descriptionFormat,
+                            descriptionArgs));
         } catch (MailboxClosedException mbex) {
             throw new RejectedExecutionException(mbex);
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index 88e0bf3..bc92ac1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -235,6 +235,7 @@
 
         @Override
         public void execute(
+                MailOptions mailOptions,
                 ThrowingRunnable<? extends Exception> command,
                 String descriptionFormat,
                 Object... descriptionArgs) {}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
index 9fb9d34..87f19f4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -35,6 +35,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -43,6 +44,7 @@
 
     @Test
     void testEmitWatermarkInsideMailbox() throws Exception {
+        int priority = 42;
         final List<StreamElement> emittedElements = new ArrayList<>();
         final TaskMailboxImpl mailbox = new TaskMailboxImpl();
         final InternalTimeServiceManager<?> timerService = new NoOpInternalTimeServiceManager();
@@ -50,7 +52,8 @@
         final MailboxWatermarkProcessor<StreamRecord<String>> watermarkProcessor =
                 new MailboxWatermarkProcessor<>(
                         new CollectorOutput<>(emittedElements),
-                        new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE),
+                        new MailboxExecutorImpl(
+                                mailbox, priority, StreamTaskActionExecutor.IMMEDIATE),
                         timerService);
         final List<Watermark> expectedOutput = new ArrayList<>();
         watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
@@ -69,6 +72,10 @@
 
         assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
 
+        // FLINK-35528: do not allow yielding to continuation mails
+        assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty());
+        assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+
         while (mailbox.hasMail()) {
             mailbox.take(TaskMailbox.MIN_PRIORITY).run();
         }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
index cfeb92b..1b6fddc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
@@ -111,6 +111,26 @@
     }
 
     @Test
+    void testDeferrable() throws Exception {
+        int priority = 42;
+        MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority);
+
+        AtomicBoolean deferrableMailExecuted = new AtomicBoolean();
+
+        localExecutor.execute(
+                MailboxExecutor.MailOptions.deferrable(),
+                () -> deferrableMailExecuted.set(true),
+                "deferrable mail");
+        assertThat(localExecutor.tryYield()).isFalse();
+        assertThat(deferrableMailExecuted.get()).isFalse();
+        assertThat(mailboxExecutor.tryYield()).isFalse();
+        assertThat(deferrableMailExecuted.get()).isFalse();
+
+        assertThat(mailboxProcessor.runMailboxStep()).isTrue();
+        assertThat(deferrableMailExecuted.get()).isTrue();
+    }
+
+    @Test
     void testClose() throws Exception {
         final TestRunnable yieldRun = new TestRunnable();
         final TestRunnable leftoverRun = new TestRunnable();