[FLINK-14366][tests] Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG

AbstractTestBase in flink-test-utils is also annotated here to enabled tests based on it.
7 failed tests are not included and will be fixed in separate PRs:
* ClassLoaderITCase, EventTimeWindowCheckpointingITCase and WindowCheckpointingITCase in FLINK-14371
* KeyedStateCheckpointingITCase in FLINK-14372
* ZooKeeperHighAvailabilityITCase in FLINK-14373
* RegionFailoverITCase in FLINK-14374
* BatchFineGrainedRecoveryITCase in FLINK-14440

This closes #9900.
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 3ac2104..acc0c2a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -19,9 +19,11 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
@@ -54,6 +56,7 @@
  *
  * </pre>
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public abstract class AbstractTestBase extends TestBaseUtils {
 
 	private static final int DEFAULT_PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index e890216..c06dd08 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -28,11 +28,13 @@
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,6 +49,7 @@
 /**
  * Integration tests for proper initialization of the job manager metrics.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class JobManagerMetricsITCase extends TestLogger {
 
 	private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager.";
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index 7801a28..9e0694f 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -27,10 +27,12 @@
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -48,6 +50,7 @@
 /**
  * Integration tests for proper initialization of the system resource metrics.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class SystemResourcesMetricsITCase extends TestLogger {
 
 	@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 9f78ce2..c5cefc3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -30,10 +30,12 @@
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertTrue;
@@ -44,6 +46,7 @@
  *  a) throw errors during runtime
  *  b) are not compatible with existing accumulator.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class AccumulatorErrorITCase extends TestLogger {
 	private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
 	private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index d406318..dadbe74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -43,12 +43,14 @@
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +64,7 @@
 /**
  * Tests the availability of accumulator results during runtime.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class AccumulatorLiveITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index ab98e92..8546a0b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -35,10 +35,12 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 
 import java.util.concurrent.TimeUnit;
 
@@ -48,6 +50,7 @@
 /**
  * Base class for testing job cancellation.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public abstract class CancelingTestBase extends TestLogger {
 
 	private static final int MINIMUM_HEAP_SIZE_MB = 192;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index c09943f..e456c11 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -36,11 +36,13 @@
 import org.apache.flink.test.checkpointing.utils.IntType;
 import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.Assert.assertEquals;
@@ -53,6 +55,7 @@
  * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static final int PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8f788cb..ba7704e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -53,6 +53,7 @@
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -61,6 +62,7 @@
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -85,6 +87,7 @@
  * Test savepoint rescaling.
  */
 @RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
 public class RescalingITCase extends TestLogger {
 
 	private static final int numTaskManagers = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 2d5fa9f..45b5138 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -38,11 +38,13 @@
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
@@ -66,6 +68,7 @@
  *
  * <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class ResumeCheckpointManuallyITCase extends TestLogger {
 
 	private static final int PARALLELISM = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2f4a857..027f113 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -54,6 +54,7 @@
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -65,6 +66,7 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,6 +101,7 @@
  * Integration test for triggering and resuming from savepoints.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class SavepointITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 74cdeef..ad949de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -27,12 +27,14 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -46,6 +48,7 @@
  * Test base for fault tolerant streaming programs.
  */
 @RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
 public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 	@Parameterized.Parameters(name = "FailoverStrategy: {0}")
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 5d40c8a..bb8f31d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -35,12 +35,14 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.OptionalFailure;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +64,7 @@
 /**
  * Test savepoint migration.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 	@BeforeClass
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index b9f5b8f..bbb80ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -38,6 +39,7 @@
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Optional;
 import java.util.concurrent.Semaphore;
@@ -48,6 +50,7 @@
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class JobRetrievalITCase extends TestLogger {
 
 	private static final Semaphore lock = new Semaphore(1);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 032a3da..e113208 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -28,11 +28,13 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -49,6 +51,7 @@
  * Tests for failing job submissions.
  */
 @RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
 public class JobSubmissionFailsITCase extends TestLogger {
 
 	private static final int NUM_TM = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 2398c14..7fea2f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -29,12 +29,16 @@
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
+
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.fail;
 
 /**
  * Manual test to evaluate impact of checkpointing on latency.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class StreamingScalabilityAndLatency {
 
 	public static void main(String[] args) throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 6c85218..a0fcd5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -29,11 +29,13 @@
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -47,6 +49,7 @@
  * and detected in the network stack.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class CustomSerializationITCase extends TestLogger {
 
 	private static final int PARLLELISM = 5;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6925a4d..a73b351 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -30,11 +30,13 @@
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertEquals;
@@ -52,6 +54,7 @@
  * test cluster.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class MiscellaneousIssuesITCase extends TestLogger {
 
 	@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 90ddf0c..dbf34db 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -35,10 +35,12 @@
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.junit.Assert.assertTrue;
@@ -49,6 +51,7 @@
  * This test validates that task slots in co-location constraints are properly
  * freed in the presence of failures.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
 	private static final int PARALLELISM = 16;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
index 1683dfc..62eb0c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -32,11 +32,13 @@
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 
@@ -46,6 +48,7 @@
  * Integration tests for custom {@link DataDistribution}.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class CustomDistributionITCase extends TestLogger {
 
 	@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 3394e78..ea27224 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -27,11 +27,13 @@
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.URI;
@@ -43,6 +45,7 @@
  * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class RemoteEnvironmentITCase extends TestLogger {
 
 	private static final int TM_SLOTS = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index ef827d6..7cf356a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -22,12 +22,15 @@
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 
 import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test cluster configuration with failure-rate recovery.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
 
 	@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 9bb2ed6..eec3859 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -22,12 +22,15 @@
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 
 import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test cluster configuration with fixed-delay recovery.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
 
 	@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
index 91d4cf4..4cd2daa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -38,6 +38,7 @@
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.testutils.serialization.types.ByteArrayType;
 import org.apache.flink.util.TestLogger;
 
@@ -46,6 +47,7 @@
 
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -63,6 +65,7 @@
  * the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
  * been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class FileBufferReaderITCase extends TestLogger {
 
 	private static final int parallelism = 8;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 8fe2aa3..9f11285 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -30,6 +30,7 @@
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -38,6 +39,7 @@
 import org.junit.AssumptionViolatedException;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.Inet6Address;
@@ -56,6 +58,7 @@
  * Test proper handling of IPv6 address literals in URLs.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class IPv6HostnamesITCase extends TestLogger {
 
 	@Rule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index ca8366b..58f9523 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -25,10 +25,12 @@
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AssumptionViolatedException;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,7 @@
  * on linux. On other platforms it's basically a NO-OP. See
  * https://github.com/apache/flink-shaded/issues/30
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class NettyEpollITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3357cb6..e78d022 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -35,9 +35,11 @@
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +49,7 @@
 /**
  * Manually test the throughput of the network stack.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class NetworkStackThroughputITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
index 85bcc78..113d304 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -38,10 +38,12 @@
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import javax.annotation.Nonnull;
 
@@ -56,6 +58,7 @@
 /**
  * IT case for testing Flink's scheduling strategies.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class SchedulingITCase extends TestLogger {
 
 	/**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 168fa1f..4d90b6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -39,6 +39,7 @@
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
@@ -46,6 +47,7 @@
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
@@ -64,6 +66,7 @@
 /**
  * Test the election of a new JobManager leader.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 	private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5L);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 154477e..f598a4d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -36,12 +36,14 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
@@ -64,6 +66,7 @@
  * Step 1: Migrate the job to the newer version by submitting the same job used for the old version savepoint, and create a new savepoint.
  * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 	private static final int NUM_TMS = 1;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index 07fb227..e7c2072 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -36,12 +36,14 @@
 import org.apache.flink.test.util.BlockingSink;
 import org.apache.flink.test.util.IdentityMapFunction;
 import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.time.Duration;
 import java.util.List;
@@ -52,6 +54,7 @@
 /**
  * Integration test for operator back pressure tracking.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class BackPressureITCase extends TestLogger {
 
 	private static final JobID TEST_JOB_ID = new JobID();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index 93e2803..43834fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -27,10 +27,12 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,6 +46,7 @@
  * Integration test that verifies that a user program with a big(ger) payload is successfully
  * submitted and run.
  */
+@Category(AlsoRunWithSchedulerNG.class)
 public class BigUserProgramJobSubmitITCase extends TestLogger {
 
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index f0938a2..83a5745 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -48,6 +48,7 @@
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -55,6 +56,7 @@
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -70,6 +72,7 @@
  * Tests for timestamps, watermarks, and event-time sources.
  */
 @SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
 public class TimestampITCase extends TestLogger {
 
 	private static final int NUM_TASK_MANAGERS = 2;