[FLINK-14366][tests] Annotate MiniCluster tests in flink-tests with AlsoRunWithSchedulerNG
AbstractTestBase in flink-test-utils is also annotated here to enabled tests based on it.
7 failed tests are not included and will be fixed in separate PRs:
* ClassLoaderITCase, EventTimeWindowCheckpointingITCase and WindowCheckpointingITCase in FLINK-14371
* KeyedStateCheckpointingITCase in FLINK-14372
* ZooKeeperHighAvailabilityITCase in FLINK-14373
* RegionFailoverITCase in FLINK-14374
* BatchFineGrainedRecoveryITCase in FLINK-14440
This closes #9900.
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 3ac2104..acc0c2a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -19,9 +19,11 @@
package org.apache.flink.test.util;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.FileUtils;
import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import java.io.File;
@@ -54,6 +56,7 @@
*
* </pre>
*/
+@Category(AlsoRunWithSchedulerNG.class)
public abstract class AbstractTestBase extends TestBaseUtils {
private static final int DEFAULT_PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
index e890216..c06dd08 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
@@ -28,11 +28,13 @@
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,6 +49,7 @@
/**
* Integration tests for proper initialization of the job manager metrics.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class JobManagerMetricsITCase extends TestLogger {
private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager.";
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index 7801a28..9e0694f 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -27,10 +27,12 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.List;
@@ -48,6 +50,7 @@
/**
* Integration tests for proper initialization of the system resource metrics.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class SystemResourcesMetricsITCase extends TestLogger {
@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index 9f78ce2..c5cefc3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -30,10 +30,12 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertTrue;
@@ -44,6 +46,7 @@
* a) throw errors during runtime
* b) are not compatible with existing accumulator.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class AccumulatorErrorITCase extends TestLogger {
private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index d406318..dadbe74 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -43,12 +43,14 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +64,7 @@
/**
* Tests the availability of accumulator results during runtime.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class AccumulatorLiveITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index ab98e92..8546a0b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -35,10 +35,12 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
import java.util.concurrent.TimeUnit;
@@ -48,6 +50,7 @@
/**
* Base class for testing job cancellation.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public abstract class CancelingTestBase extends TestLogger {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index c09943f..e456c11 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -36,11 +36,13 @@
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
@@ -53,6 +55,7 @@
* <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static final int PARALLELISM = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8f788cb..ba7704e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -53,6 +53,7 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -61,6 +62,7 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -85,6 +87,7 @@
* Test savepoint rescaling.
*/
@RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
public class RescalingITCase extends TestLogger {
private static final int numTaskManagers = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 2d5fa9f..45b5138 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -38,11 +38,13 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
@@ -66,6 +68,7 @@
*
* <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class ResumeCheckpointManuallyITCase extends TestLogger {
private static final int PARALLELISM = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2f4a857..027f113 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -54,6 +54,7 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -65,6 +66,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +101,7 @@
* Integration test for triggering and resuming from savepoints.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class SavepointITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 74cdeef..ad949de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -27,12 +27,14 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -46,6 +48,7 @@
* Test base for fault tolerant streaming programs.
*/
@RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
public abstract class StreamFaultToleranceTestBase extends TestLogger {
@Parameterized.Parameters(name = "FailoverStrategy: {0}")
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 5d40c8a..bb8f31d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -35,12 +35,14 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.OptionalFailure;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +64,7 @@
/**
* Test savepoint migration.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
@BeforeClass
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index b9f5b8f..bbb80ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,6 +31,7 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -38,6 +39,7 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.Optional;
import java.util.concurrent.Semaphore;
@@ -48,6 +50,7 @@
/**
* Tests retrieval of a job from a running Flink cluster.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class JobRetrievalITCase extends TestLogger {
private static final Semaphore lock = new Semaphore(1);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 032a3da..e113208 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -28,11 +28,13 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -49,6 +51,7 @@
* Tests for failing job submissions.
*/
@RunWith(Parameterized.class)
+@Category(AlsoRunWithSchedulerNG.class)
public class JobSubmissionFailsITCase extends TestLogger {
private static final int NUM_TM = 2;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 2398c14..7fea2f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -29,12 +29,16 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
+
+import org.junit.experimental.categories.Category;
import static org.junit.Assert.fail;
/**
* Manual test to evaluate impact of checkpointing on latency.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class StreamingScalabilityAndLatency {
public static void main(String[] args) throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 6c85218..a0fcd5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -29,11 +29,13 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.types.Value;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Optional;
@@ -47,6 +49,7 @@
* and detected in the network stack.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class CustomSerializationITCase extends TestLogger {
private static final int PARLLELISM = 5;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6925a4d..a73b351 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -30,11 +30,13 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertEquals;
@@ -52,6 +54,7 @@
* test cluster.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class MiscellaneousIssuesITCase extends TestLogger {
@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 90ddf0c..dbf34db 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -35,10 +35,12 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.junit.Assert.assertTrue;
@@ -49,6 +51,7 @@
* This test validates that task slots in co-location constraints are properly
* freed in the presence of failures.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static final int PARALLELISM = 16;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
index 1683dfc..62eb0c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java
@@ -32,11 +32,13 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
@@ -46,6 +48,7 @@
* Integration tests for custom {@link DataDistribution}.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class CustomDistributionITCase extends TestLogger {
@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index 3394e78..ea27224 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -27,11 +27,13 @@
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.net.URI;
@@ -43,6 +45,7 @@
* Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class RemoteEnvironmentITCase extends TestLogger {
private static final int TM_SLOTS = 4;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index ef827d6..7cf356a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -22,12 +22,15 @@
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
/**
* Test cluster configuration with failure-rate recovery.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 9bb2ed6..eec3859 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -22,12 +22,15 @@
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
/**
* Test cluster configuration with fixed-delay recovery.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
@ClassRule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
index 91d4cf4..4cd2daa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -38,6 +38,7 @@
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
@@ -46,6 +47,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.concurrent.CompletableFuture;
@@ -63,6 +65,7 @@
* the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
* been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class FileBufferReaderITCase extends TestLogger {
private static final int parallelism = 8;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 8fe2aa3..9f11285 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -30,6 +30,7 @@
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
@@ -38,6 +39,7 @@
import org.junit.AssumptionViolatedException;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.net.Inet6Address;
@@ -56,6 +58,7 @@
* Test proper handling of IPv6 address literals in URLs.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class IPv6HostnamesITCase extends TestLogger {
@Rule
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index ca8366b..58f9523 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -25,10 +25,12 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.AssumptionViolatedException;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,7 @@
* on linux. On other platforms it's basically a NO-OP. See
* https://github.com/apache/flink-shaded/issues/30
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class NettyEpollITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3357cb6..e78d022 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -35,9 +35,11 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,7 @@
/**
* Manually test the throughput of the network stack.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class NetworkStackThroughputITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
index 85bcc78..113d304 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -38,10 +38,12 @@
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import javax.annotation.Nonnull;
@@ -56,6 +58,7 @@
/**
* IT case for testing Flink's scheduling strategies.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class SchedulingITCase extends TestLogger {
/**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 168fa1f..4d90b6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -39,6 +39,7 @@
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.apache.curator.test.TestingServer;
@@ -46,6 +47,7 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nonnull;
@@ -64,6 +66,7 @@
/**
* Test the election of a new JobManager leader.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class ZooKeeperLeaderElectionITCase extends TestLogger {
private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5L);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 154477e..f598a4d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -36,12 +36,14 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import java.io.File;
@@ -64,6 +66,7 @@
* Step 1: Migrate the job to the newer version by submitting the same job used for the old version savepoint, and create a new savepoint.
* Step 2: Modify the job topology, and restore from the savepoint created in step 1.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
private static final int NUM_TMS = 1;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index 07fb227..e7c2072 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -36,12 +36,14 @@
import org.apache.flink.test.util.BlockingSink;
import org.apache.flink.test.util.IdentityMapFunction;
import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.time.Duration;
import java.util.List;
@@ -52,6 +54,7 @@
/**
* Integration test for operator back pressure tracking.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class BackPressureITCase extends TestLogger {
private static final JobID TEST_JOB_ID = new JobID();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index 93e2803..43834fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -27,10 +27,12 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
@@ -44,6 +46,7 @@
* Integration test that verifies that a user program with a big(ger) payload is successfully
* submitted and run.
*/
+@Category(AlsoRunWithSchedulerNG.class)
public class BigUserProgramJobSubmitITCase extends TestLogger {
// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index f0938a2..83a5745 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -48,6 +48,7 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -55,6 +56,7 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Collection;
@@ -70,6 +72,7 @@
* Tests for timestamps, watermarks, and event-time sources.
*/
@SuppressWarnings("serial")
+@Category(AlsoRunWithSchedulerNG.class)
public class TimestampITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;