[FLINK-35528][task] Skip execution of interruptible mails when yielding
When operators are yielding, for example waiting for async state access to complete before a checkpoint, it would be beneficial to not execute interruptible mails. Otherwise continuation mail for firing timers would be continuously re-enqeueed. To achieve that MailboxExecutor must be aware which mails are interruptible.
The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
new file mode 100644
index 0000000..3bb5b77
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+ static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false);
+ static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(true);
+
+ private final boolean deferrable;
+
+ private MailOptionsImpl(boolean deferrable) {
+ this.deferrable = deferrable;
+ }
+
+ public boolean isDeferrable() {
+ return deferrable;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
index 639ed18..79e1793 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
@@ -86,6 +86,26 @@
/** A constant for empty args to save on object allocation. */
Object[] EMPTY_ARGS = new Object[0];
+ /** Extra options to configure enqueued mails. */
+ @PublicEvolving
+ interface MailOptions {
+ static MailOptions options() {
+ return MailOptionsImpl.DEFAULT;
+ }
+
+ /**
+ * Mark this mail as deferrable.
+ *
+ * <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
+ * subtask thread as quickly as possible, deferrable mails are not executed during {@link
+ * #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
+ * execution of potentially long-running mails.
+ */
+ static MailOptions deferrable() {
+ return MailOptionsImpl.DEFERRABLE;
+ }
+ }
+
/**
* Executes the given command at some time in the future in the mailbox thread.
*
@@ -110,6 +130,49 @@
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
+ * @param mailOptions additional options to configure behaviour of the {@code command}
+ * @param command the runnable task to add to the mailbox for execution.
+ * @param description the optional description for the command that is used for debugging and
+ * error-reporting.
+ * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
+ * because the mailbox is quiesced or closed.
+ */
+ default void execute(
+ MailOptions mailOptions,
+ ThrowingRunnable<? extends Exception> command,
+ String description) {
+ execute(mailOptions, command, description, EMPTY_ARGS);
+ }
+
+ /**
+ * Executes the given command at some time in the future in the mailbox thread.
+ *
+ * <p>An optional description can (and should) be added to ease debugging and error-reporting.
+ * The description may contain placeholder that refer to the provided description arguments
+ * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
+ *
+ * @param command the runnable task to add to the mailbox for execution.
+ * @param descriptionFormat the optional description for the command that is used for debugging
+ * and error-reporting.
+ * @param descriptionArgs the parameters used to format the final description string.
+ * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
+ * because the mailbox is quiesced or closed.
+ */
+ default void execute(
+ ThrowingRunnable<? extends Exception> command,
+ String descriptionFormat,
+ Object... descriptionArgs) {
+ execute(MailOptions.options(), command, descriptionFormat, descriptionArgs);
+ }
+
+ /**
+ * Executes the given command at some time in the future in the mailbox thread.
+ *
+ * <p>An optional description can (and should) be added to ease debugging and error-reporting.
+ * The description may contain placeholder that refer to the provided description arguments
+ * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
+ *
+ * @param mailOptions additional options to configure behaviour of the {@code command}
* @param command the runnable task to add to the mailbox for execution.
* @param descriptionFormat the optional description for the command that is used for debugging
* and error-reporting.
@@ -118,6 +181,7 @@
* because the mailbox is quiesced or closed.
*/
void execute(
+ MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index f99dd87..e98a66b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -824,6 +824,7 @@
@Override
public void execute(
+ MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
index 13ab803..5b86284 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
@@ -25,6 +25,7 @@
public class SyncMailboxExecutor implements MailboxExecutor {
@Override
public void execute(
+ MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
index 02cfd96..76eabe6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
@@ -77,6 +77,7 @@
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
+ MailboxExecutor.MailOptions.deferrable(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
index bed96ae..6afd891 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks.mailbox;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailOptionsImpl;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
@@ -30,6 +32,7 @@
*/
@Internal
public class Mail {
+ private final MailOptionsImpl mailOptions;
/** The action to execute. */
private final ThrowingRunnable<? extends Exception> runnable;
/**
@@ -50,6 +53,7 @@
String descriptionFormat,
Object... descriptionArgs) {
this(
+ MailboxExecutor.MailOptions.options(),
runnable,
priority,
StreamTaskActionExecutor.IMMEDIATE,
@@ -58,11 +62,13 @@
}
public Mail(
+ MailboxExecutor.MailOptions mailOptions,
ThrowingRunnable<? extends Exception> runnable,
int priority,
StreamTaskActionExecutor actionExecutor,
String descriptionFormat,
Object... descriptionArgs) {
+ this.mailOptions = (MailOptionsImpl) mailOptions;
this.runnable = Preconditions.checkNotNull(runnable);
this.priority = priority;
this.descriptionFormat =
@@ -71,8 +77,13 @@
this.actionExecutor = actionExecutor;
}
+ public MailboxExecutor.MailOptions getMailOptions() {
+ return mailOptions;
+ }
+
public int getPriority() {
- return priority;
+ /** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
+ return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority;
}
public void tryCancel(boolean mayInterruptIfRunning) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index b2c8eac..2987ee2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -67,13 +67,19 @@
@Override
public void execute(
+ MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
- command, priority, actionExecutor, descriptionFormat, descriptionArgs));
+ mailOptions,
+ command,
+ priority,
+ actionExecutor,
+ descriptionFormat,
+ descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index 88e0bf3..bc92ac1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -235,6 +235,7 @@
@Override
public void execute(
+ MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
index 9fb9d34..87f19f4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@@ -43,6 +44,7 @@
@Test
void testEmitWatermarkInsideMailbox() throws Exception {
+ int priority = 42;
final List<StreamElement> emittedElements = new ArrayList<>();
final TaskMailboxImpl mailbox = new TaskMailboxImpl();
final InternalTimeServiceManager<?> timerService = new NoOpInternalTimeServiceManager();
@@ -50,7 +52,8 @@
final MailboxWatermarkProcessor<StreamRecord<String>> watermarkProcessor =
new MailboxWatermarkProcessor<>(
new CollectorOutput<>(emittedElements),
- new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE),
+ new MailboxExecutorImpl(
+ mailbox, priority, StreamTaskActionExecutor.IMMEDIATE),
timerService);
final List<Watermark> expectedOutput = new ArrayList<>();
watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
@@ -69,6 +72,10 @@
assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+ // FLINK-35528: do not allow yielding to continuation mails
+ assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty());
+ assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+
while (mailbox.hasMail()) {
mailbox.take(TaskMailbox.MIN_PRIORITY).run();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
index cfeb92b..1b6fddc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
@@ -111,6 +111,26 @@
}
@Test
+ void testDeferrable() throws Exception {
+ int priority = 42;
+ MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority);
+
+ AtomicBoolean deferrableMailExecuted = new AtomicBoolean();
+
+ localExecutor.execute(
+ MailboxExecutor.MailOptions.deferrable(),
+ () -> deferrableMailExecuted.set(true),
+ "deferrable mail");
+ assertThat(localExecutor.tryYield()).isFalse();
+ assertThat(deferrableMailExecuted.get()).isFalse();
+ assertThat(mailboxExecutor.tryYield()).isFalse();
+ assertThat(deferrableMailExecuted.get()).isFalse();
+
+ assertThat(mailboxProcessor.runMailboxStep()).isTrue();
+ assertThat(deferrableMailExecuted.get()).isTrue();
+ }
+
+ @Test
void testClose() throws Exception {
final TestRunnable yieldRun = new TestRunnable();
final TestRunnable leftoverRun = new TestRunnable();