[FLINK-33024][table-planner][JUnit5 Migration] Module: flink-table-planner (JsonPlanTestBase) (#23353)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 4ca0a5d..591f0a5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -25,10 +25,11 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.table.planner.utils.JsonTestUtils;
 import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
 import org.apache.commons.io.FileUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,15 +48,16 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link CompiledPlan} and related {@link TableEnvironment} methods. */
-public class CompiledPlanITCase extends JsonPlanTestBase {
+class CompiledPlanITCase extends JsonPlanTestBase {
 
     private static final List<String> DATA =
             Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
     private static final String[] COLUMNS_DEFINITION =
             new String[] {"a bigint", "b int", "c varchar"};
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
 
         String srcTableDdl =
@@ -76,7 +78,7 @@
     }
 
     @Test
-    public void testCompilePlanSql() throws IOException {
+    void testCompilePlanSql() throws IOException {
         CompiledPlan compiledPlan =
                 tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable");
         String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
@@ -92,7 +94,7 @@
     }
 
     @Test
-    public void testExecutePlanSql() throws Exception {
+    void testExecutePlanSql() throws Exception {
         File sinkPath = createSourceSinkTables();
 
         tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").execute().await();
@@ -101,10 +103,10 @@
     }
 
     @Test
-    public void testExecuteCtasPlanSql() throws Exception {
+    void testExecuteCtasPlanSql() throws Exception {
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
 
-        File sinkPath = TEMPORARY_FOLDER.newFolder();
+        File sinkPath = TempDirUtils.newFolder(tempFolder);
         assertThatThrownBy(
                         () ->
                                 tableEnv.compilePlanSql(
@@ -125,7 +127,7 @@
     }
 
     @Test
-    public void testExecutePlanTable() throws Exception {
+    void testExecutePlanTable() throws Exception {
         File sinkPath = createSourceSinkTables();
 
         tableEnv.from("src").select($("*")).insertInto("sink").compilePlan().execute().await();
@@ -134,8 +136,9 @@
     }
 
     @Test
-    public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
-        Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+    void testCompileWriteToFileAndThenExecuteSql() throws Exception {
+        Path planPath =
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");
 
         File sinkPath = createSourceSinkTables();
 
@@ -148,8 +151,9 @@
     }
 
     @Test
-    public void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
-        Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+    void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
+        Path planPath =
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");
 
         File sinkPath = createSourceSinkTables();
 
@@ -165,9 +169,9 @@
     }
 
     @Test
-    public void testCompilePlan() throws Exception {
+    void testCompilePlan() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -195,9 +199,9 @@
     }
 
     @Test
-    public void testCompilePlanWithStatementSet() throws Exception {
+    void testCompilePlanWithStatementSet() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -226,9 +230,9 @@
     }
 
     @Test
-    public void testCompilePlanIfNotExists() throws Exception {
+    void testCompilePlanIfNotExists() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -256,11 +260,14 @@
     }
 
     @Test
-    public void testCompilePlanOverwrite() throws Exception {
+    void testCompilePlanOverwrite() throws Exception {
         tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
 
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(
+                                URI.create(TempDirUtils.newFolder(tempFolder, "plan").getPath())
+                                        .getPath(),
+                                "plan.json")
                         .toAbsolutePath();
 
         List<String> expectedData =
@@ -297,9 +304,9 @@
     }
 
     @Test
-    public void testCompileAndExecutePlan() throws Exception {
+    void testCompileAndExecutePlan() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         File sinkPath = createSourceSinkTables();
@@ -316,9 +323,9 @@
     }
 
     @Test
-    public void testCompileAndExecutePlanWithStatementSet() throws Exception {
+    void testCompileAndExecutePlanWithStatementSet() throws Exception {
         Path planPath =
-                Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+                Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
                         .toAbsolutePath();
 
         createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -344,7 +351,7 @@
     }
 
     @Test
-    public void testExplainPlan() throws IOException {
+    void testExplainPlan() throws IOException {
         String planFromResources =
                 JsonTestUtils.setFlinkVersion(
                                 JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"),
@@ -360,7 +367,7 @@
     }
 
     @Test
-    public void testPersistedConfigOption() throws Exception {
+    void testPersistedConfigOption() throws Exception {
         List<String> data =
                 Stream.concat(
                                 DATA.stream(),
@@ -397,7 +404,7 @@
     }
 
     @Test
-    public void testBatchMode() {
+    void testBatchMode() {
         tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
         String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
index 8d03a24..89aac5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
@@ -25,7 +25,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -33,10 +33,10 @@
 import java.util.List;
 
 /** Test for calc json plan. */
-public class CalcJsonPlanITCase extends JsonPlanTestBase {
+class CalcJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testSimpleCalc() throws Exception {
+    void testSimpleCalc() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
         File sinkPath =
@@ -55,7 +55,7 @@
     }
 
     @Test
-    public void testCalcWithUdf() throws Exception {
+    void testCalcWithUdf() throws Exception {
         tableEnv.createTemporaryFunction("udf1", new JavaFunc0());
         tableEnv.createTemporarySystemFunction("udf2", new JavaFunc2());
         tableEnv.createFunction("udf3", UdfWithOpen.class);
@@ -86,7 +86,7 @@
     }
 
     @Test
-    public void testProjectPushDown() throws Exception {
+    void testProjectPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "b int", "a bigint", "a1 varchar");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
index 833ba3f..5ceb0bd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
@@ -23,7 +23,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,10 +31,10 @@
 import java.util.Map;
 
 /** Integration tests for operations on changelog source, including upsert source. */
-public class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
+class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testChangelogSource() throws Exception {
+    void testChangelogSource() throws Exception {
         registerChangelogSource();
         createTestNonInsertOnlyValuesSinkTable(
                 "user_sink",
@@ -56,7 +56,7 @@
     }
 
     @Test
-    public void testToUpsertSource() throws Exception {
+    void testToUpsertSource() throws Exception {
         registerUpsertSource();
         createTestNonInsertOnlyValuesSinkTable(
                 "user_sink",
@@ -79,7 +79,7 @@
 
     // ------------------------------------------------------------------------------------------
 
-    public void registerChangelogSource() {
+    protected void registerChangelogSource() {
         Map<String, String> properties = new HashMap<>();
         properties.put("changelog-mode", "I,UA,UB,D");
         createTestValuesSourceTable(
@@ -95,7 +95,7 @@
                 properties);
     }
 
-    public void registerUpsertSource() {
+    protected void registerUpsertSource() {
         Map<String, String> properties = new HashMap<>();
         properties.put("changelog-mode", "I,UA,D");
         createTestValuesSourceTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
index 6d1a220..fb879bf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
@@ -27,7 +27,7 @@
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -39,10 +39,10 @@
  * Tests for configuring operator-level state TTL via {@link
  * org.apache.flink.table.api.CompiledPlan}.
  */
-public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
+class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
 
     @Test
-    public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
+    void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
         String dataId =
                 TestValuesTableFactory.registerRowData(
                         Arrays.asList(
@@ -117,7 +117,7 @@
     }
 
     @Test
-    public void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
+    void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
         String leftTableDataId =
                 TestValuesTableFactory.registerRowData(
                         Arrays.asList(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
index 1874771..a90c3cd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
@@ -23,8 +23,8 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,16 +32,16 @@
 import java.util.concurrent.ExecutionException;
 
 /** Integration tests for correlate. */
-public class CorrelateJsonPlanITCase extends JsonPlanTestBase {
+class CorrelateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         List<Row> data = Collections.singletonList(Row.of("1,1,hi"));
         createTestValuesSourceTable("MyTable", data, "a varchar");
     }
 
     @Test
-    public void testSystemFuncByObject() throws ExecutionException, InterruptedException {
+    void testSystemFuncByObject() throws ExecutionException, InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -53,7 +53,7 @@
     }
 
     @Test
-    public void testSystemFuncByClass() throws ExecutionException, InterruptedException {
+    void testSystemFuncByClass() throws ExecutionException, InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -65,7 +65,7 @@
     }
 
     @Test
-    public void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
+    void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
         tableEnv.createTemporaryFunction(
                 "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -77,7 +77,7 @@
     }
 
     @Test
-    public void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
+    void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
         tableEnv.createTemporaryFunction(
                 "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -89,7 +89,7 @@
     }
 
     @Test
-    public void testFilter() throws ExecutionException, InterruptedException {
+    void testFilter() throws ExecutionException, InterruptedException {
         tableEnv.createTemporarySystemFunction(
                 "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
         createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -103,7 +103,7 @@
     }
 
     @Test
-    public void testUnnest() throws ExecutionException, InterruptedException {
+    void testUnnest() throws ExecutionException, InterruptedException {
         List<Row> data =
                 Collections.singletonList(
                         Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), Row.of("3")}));
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
index d095ae0..afb9f5e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
@@ -24,16 +24,16 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for deduplication json plan. */
-public class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
+class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testDeduplication() throws Exception {
+    void testDeduplication() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of(1L, "terry", "pen", 1000L),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
index 4ebf82e..1226dac 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
@@ -26,16 +26,16 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for expand json plan. */
-public class ExpandJsonPlanITCase extends JsonPlanTestBase {
+class ExpandJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testExpand() throws Exception {
+    void testExpand() throws Exception {
         tableEnv.getConfig()
                 .set(
                         OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
index 3450478..367253e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
@@ -27,29 +27,32 @@
 import org.apache.flink.table.planner.runtime.utils.TestData;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for group aggregate json plan. */
-@RunWith(Parameterized.class)
-public class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Parameterized.Parameter public boolean isMiniBatchEnabled;
+    @Parameter private boolean isMiniBatchEnabled;
 
-    @Parameterized.Parameters(name = "isMiniBatchEnabled={0}")
-    public static List<Boolean> testData() {
+    @Parameters(name = "isMiniBatchEnabled={0}")
+    private static List<Boolean> testData() {
         return Arrays.asList(true, false);
     }
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         if (isMiniBatchEnabled) {
             tableEnv.getConfig()
@@ -63,8 +66,8 @@
         }
     }
 
-    @Test
-    public void testSimpleAggCallsWithGroupBy() throws Exception {
+    @TestTemplate
+    void testSimpleAggCallsWithGroupBy() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -90,8 +93,8 @@
         assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result);
     }
 
-    @Test
-    public void testDistinctAggCalls() throws Exception {
+    @TestTemplate
+    void testDistinctAggCalls() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data2()),
@@ -130,8 +133,8 @@
                 result);
     }
 
-    @Test
-    public void testUserDefinedAggCallsWithoutMerge() throws Exception {
+    @TestTemplate
+    void testUserDefinedAggCallsWithoutMerge() throws Exception {
         tableEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction());
         tableEnv.createFunction("my_avg", WeightedAvg.class);
         tableEnv.createTemporarySystemFunction("my_sum2", VarSum2AggFunction.class);
@@ -166,8 +169,8 @@
                 Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 58, 0, 3]"), result);
     }
 
-    @Test
-    public void testUserDefinedAggCallsWithMerge() throws Exception {
+    @TestTemplate
+    void testUserDefinedAggCallsWithMerge() throws Exception {
         tableEnv.createFunction("my_avg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
         tableEnv.createTemporarySystemFunction(
                 "my_concat_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
index 71e9927..f461640 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
@@ -23,18 +23,19 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for group window aggregate json plan. */
-public class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
+class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
@@ -93,7 +94,7 @@
     }
 
     @Test
-    public void testEventTimeHopWindow() throws Exception {
+    void testEventTimeHopWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -121,7 +122,7 @@
     }
 
     @Test
-    public void testEventTimeSessionWindow() throws Exception {
+    void testEventTimeSessionWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
index ea76839..8888bca 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -37,10 +37,11 @@
 import java.util.concurrent.ExecutionException;
 
 /** Test for incremental aggregate json plan. */
-public class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
+class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         tableEnv.getConfig()
                 .set(
@@ -56,8 +57,7 @@
     }
 
     @Test
-    public void testIncrementalAggregate()
-            throws IOException, ExecutionException, InterruptedException {
+    void testIncrementalAggregate() throws IOException, ExecutionException, InterruptedException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
index def8c31..94a1e41 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
@@ -22,17 +22,17 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for IntervalJoin json plan. */
-public class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
+class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
 
     /** test process time inner join. * */
     @Test
-    public void testProcessTimeInnerJoin() throws Exception {
+    void testProcessTimeInnerJoin() throws Exception {
         List<Row> rowT1 =
                 Arrays.asList(
                         Row.of(1, 1L, "Hi1"),
@@ -69,7 +69,7 @@
     }
 
     @Test
-    public void testRowTimeInnerJoin() throws Exception {
+    void testRowTimeInnerJoin() throws Exception {
         List<Row> rowT1 =
                 Arrays.asList(
                         Row.of(1, 1L, "Hi1"),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
index 7fc86c5..a722226 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
@@ -23,19 +23,19 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for join json plan. */
-public class JoinJsonPlanITCase extends JsonPlanTestBase {
+class JoinJsonPlanITCase extends JsonPlanTestBase {
 
     @Override
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "A",
@@ -55,7 +55,7 @@
 
     /** test non-window inner join. * */
     @Test
-    public void testNonWindowInnerJoin() throws Exception {
+    void testNonWindowInnerJoin() throws Exception {
         List<String> dataT1 =
                 Arrays.asList(
                         "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8",
@@ -88,7 +88,7 @@
     }
 
     @Test
-    public void testIsNullInnerJoinWithNullCond() throws Exception {
+    void testIsNullInnerJoinWithNullCond() throws Exception {
         List<String> dataT1 =
                 Arrays.asList(
                         "1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8",
@@ -125,7 +125,7 @@
     }
 
     @Test
-    public void testJoin() throws Exception {
+    void testJoin() throws Exception {
         createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
         compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM A, B WHERE a2 = b2")
                 .await();
@@ -136,7 +136,7 @@
     }
 
     @Test
-    public void testInnerJoin() throws Exception {
+    void testInnerJoin() throws Exception {
         createTestValuesSinkTable("MySink", "a1 int", "b1 int");
         compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1")
                 .await();
@@ -145,7 +145,7 @@
     }
 
     @Test
-    public void testJoinWithFilter() throws Exception {
+    void testJoinWithFilter() throws Exception {
         createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
@@ -156,7 +156,7 @@
     }
 
     @Test
-    public void testInnerJoinWithDuplicateKey() throws Exception {
+    void testInnerJoinWithDuplicateKey() throws Exception {
         createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int");
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
index 50b4f5d..6507481 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
@@ -23,7 +23,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@
 import java.util.concurrent.ExecutionException;
 
 /** Test for limit JsonPlan ser/de. */
-public class LimitJsonPlanITCase extends JsonPlanTestBase {
+class LimitJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testLimit() throws ExecutionException, InterruptedException, IOException {
+    void testLimit() throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
index c24a619..71e76e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
@@ -22,17 +22,19 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for LookupJoin json plan. */
-public class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
+class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
 
+    @BeforeEach
     @Override
-    public void setup() throws Exception {
+    protected void setup() throws Exception {
         super.setup();
         List<Row> rowT1 =
                 Arrays.asList(
@@ -66,7 +68,7 @@
 
     /** test join temporal table. * */
     @Test
-    public void testJoinLookupTable() throws Exception {
+    void testJoinLookupTable() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink "
                                 + "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table \n"
@@ -81,7 +83,7 @@
     }
 
     @Test
-    public void testJoinLookupTableWithPushDown() throws Exception {
+    void testJoinLookupTableWithPushDown() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink \n"
                                 + "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table \n "
@@ -93,7 +95,7 @@
     }
 
     @Test
-    public void testLeftJoinLookupTableWithPreFilter() throws Exception {
+    void testLeftJoinLookupTableWithPreFilter() throws Exception {
         compileSqlAndExecutePlan(
                         "insert into MySink "
                                 + "SELECT T.id, T.len, T.content, D.name FROM src AS T LEFT JOIN user_table \n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
index 6f68c5c..ca64b21 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
@@ -22,16 +22,16 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 /** Test json deserialization for match recognize. */
-public class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
+class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSimpleMatch() throws Exception {
+    void testSimpleMatch() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of(1L, "a"),
@@ -70,7 +70,7 @@
     }
 
     @Test
-    public void testComplexMatch() throws Exception {
+    void testComplexMatch() throws Exception {
         List<Row> data =
                 Arrays.asList(
                         Row.of("ACME", 1L, 19, 1),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
index 0aaf882..55fc7a0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
@@ -26,7 +26,7 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -34,10 +34,10 @@
 import java.util.concurrent.ExecutionException;
 
 /** Test json deserialization for over aggregate. */
-public class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
+class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testProcTimeBoundedPartitionedRowsOver()
+    void testProcTimeBoundedPartitionedRowsOver()
             throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable",
@@ -79,7 +79,7 @@
     }
 
     @Test
-    public void testProcTimeUnboundedNonPartitionedRangeOver()
+    void testProcTimeUnboundedNonPartitionedRangeOver()
             throws IOException, ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(
@@ -124,7 +124,7 @@
     }
 
     @Test
-    public void testRowTimeBoundedPartitionedRangeOver()
+    void testRowTimeBoundedPartitionedRangeOver()
             throws IOException, ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
index 8397a40..1b86627 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
@@ -23,7 +23,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@
 import java.util.concurrent.ExecutionException;
 
 /** Test for Rank JsonPlan ser/de. */
-public class RankJsonPlanITCase extends JsonPlanTestBase {
+class RankJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testRank() throws ExecutionException, InterruptedException, IOException {
+    void testRank() throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
@@ -52,7 +52,7 @@
     }
 
     @Test
-    public void testFirstN() throws ExecutionException, InterruptedException, IOException {
+    void testFirstN() throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable1",
                 JavaScalaConversionUtil.toJava(TestData.data4()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
index 1320fa2..bb40b5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
@@ -22,16 +22,16 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /** Test for Sarg JsonPlan ser/de. */
-public class SargJsonPlanITCase extends JsonPlanTestBase {
+class SargJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSarg() throws ExecutionException, InterruptedException {
+    void testSarg() throws ExecutionException, InterruptedException {
         List<Row> data =
                 Arrays.asList(Row.of(1), Row.of(2), Row.of((Integer) null), Row.of(4), Row.of(5));
         createTestValuesSourceTable("MyTable", data, "a int");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
index bd1f168..7473057 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
@@ -23,7 +23,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -31,9 +31,9 @@
 import java.util.concurrent.ExecutionException;
 
 /** Test for sort limit JsonPlan ser/de. */
-public class SortLimitJsonPlanITCase extends JsonPlanTestBase {
+class SortLimitJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testSortLimit() throws ExecutionException, InterruptedException, IOException {
+    void testSortLimit() throws ExecutionException, InterruptedException, IOException {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
index 3a4a6cc..73577d0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
@@ -21,8 +21,8 @@
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -30,18 +30,19 @@
 import java.util.List;
 
 /** Test for table sink json plan. */
-public class TableSinkJsonPlanITCase extends JsonPlanTestBase {
+class TableSinkJsonPlanITCase extends JsonPlanTestBase {
 
     List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int", "c varchar");
     }
 
     @Test
-    public void testPartitioning() throws Exception {
+    void testPartitioning() throws Exception {
         File sinkPath =
                 createTestCsvSinkTable(
                         "MySink",
@@ -55,7 +56,7 @@
     }
 
     @Test
-    public void testWritingMetadata() throws Exception {
+    void testWritingMetadata() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 new String[] {"a bigint", "b int", "c varchar METADATA"},
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
index 6a24ee0..b757407 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
@@ -22,7 +22,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -32,10 +32,10 @@
 import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
 
 /** Test for table source json plan. */
-public class TableSourceJsonPlanITCase extends JsonPlanTestBase {
+class TableSourceJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testProjectPushDown() throws Exception {
+    void testProjectPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int");
@@ -46,7 +46,7 @@
     }
 
     @Test
-    public void testReadingMetadata() throws Exception {
+    void testReadingMetadata() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -65,7 +65,7 @@
     }
 
     @Test
-    public void testReadingMetadataWithProjectionPushDownDisabled() throws Exception {
+    void testReadingMetadataWithProjectionPushDownDisabled() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -85,7 +85,7 @@
     }
 
     @Test
-    public void testFilterPushDown() throws Exception {
+    void testFilterPushDown() throws Exception {
         List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
         createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
         File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int", "c varchar");
@@ -96,7 +96,7 @@
     }
 
     @Test
-    public void testPartitionPushDown() throws Exception {
+    void testPartitionPushDown() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -115,7 +115,7 @@
     }
 
     @Test
-    public void testWatermarkPushDown() throws Exception {
+    void testWatermarkPushDown() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -146,7 +146,7 @@
     }
 
     @Test
-    public void testPushDowns() throws Exception {
+    void testPushDowns() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
index ed57d92..56ebcf5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
@@ -23,7 +23,8 @@
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
@@ -31,10 +32,11 @@
 import static org.apache.flink.table.api.Expressions.$;
 
 /** Test for TemporalJoin json plan. */
-public class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
+class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
 
+    @BeforeEach
     @Override
-    public void setup() throws Exception {
+    protected void setup() throws Exception {
         super.setup();
         List<Row> orders =
                 Arrays.asList(
@@ -78,7 +80,7 @@
 
     /** test process time inner join. * */
     @Test
-    public void testJoinTemporalFunction() throws Exception {
+    void testJoinTemporalFunction() throws Exception {
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink "
                                 + "SELECT amount * r.rate "
@@ -91,7 +93,7 @@
     }
 
     @Test
-    public void testTemporalTableJoin() throws Exception {
+    void testTemporalTableJoin() throws Exception {
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink "
                                 + "SELECT amount * r.rate "
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
index 888e606..a97a90b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
@@ -23,7 +23,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,10 +31,10 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for temporal sort json plan. */
-public class TemporalSortJsonITCase extends JsonPlanTestBase {
+class TemporalSortJsonITCase extends JsonPlanTestBase {
 
     @Test
-    public void testSortProcessingTime() throws Exception {
+    void testSortProcessingTime() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -52,7 +52,7 @@
     }
 
     @Test
-    public void testSortRowTime() throws Exception {
+    void testSortRowTime() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
index 8cc6ce3..1d7096a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
@@ -23,15 +23,15 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test json serialization/deserialization for union. */
-public class UnionJsonPlanITCase extends JsonPlanTestBase {
+class UnionJsonPlanITCase extends JsonPlanTestBase {
     @Test
-    public void testUnion() throws Exception {
+    void testUnion() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
index eb96759..e71546f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
@@ -21,16 +21,16 @@
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 /** Test for values json plan. */
-public class ValuesJsonPlanITCase extends JsonPlanTestBase {
+class ValuesJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testValues() throws Exception {
+    void testValues() throws Exception {
         createTestValuesSinkTable("MySink", "b INT", "a INT", "c VARCHAR");
         compileSqlAndExecutePlan(
                         "INSERT INTO MySink SELECT * from (VALUES (1, 2, 'Hi'), (3, 4, 'Hello'))")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
index 4ac0a18..bfc32e8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
@@ -22,7 +22,7 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
@@ -31,10 +31,10 @@
 import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
 
 /** Test for watermark assigner json plan. */
-public class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
+class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
 
     @Test
-    public void testWatermarkAssigner() throws Exception {
+    void testWatermarkAssigner() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -65,7 +65,7 @@
     }
 
     @Test
-    public void testWatermarkPushDownWithMetadata() throws Exception {
+    void testWatermarkPushDownWithMetadata() throws Exception {
         // to verify FLINK-30598: the case declares metadata field first, without the fix it'll get
         // wrong code generated by WatermarkGeneratorCodeGenerator which reference the incorrect
         // varchar column as the watermark field.
@@ -102,7 +102,7 @@
     }
 
     @Test
-    public void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
+    void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
         createTestValuesSourceTable(
                 "MyTable",
                 JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index 6e7e848..dc07965 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -24,32 +24,35 @@
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window aggregate json plan. */
-@RunWith(Parameterized.class)
-public class WindowAggregateJsonITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class WindowAggregateJsonITCase extends JsonPlanTestBase {
 
-    @Parameterized.Parameters(name = "agg_phase = {0}")
-    public static Object[] parameters() {
+    @Parameters(name = "agg_phase = {0}")
+    private static Object[] parameters() {
         return new Object[][] {
             new Object[] {AggregatePhaseStrategy.ONE_PHASE},
             new Object[] {AggregatePhaseStrategy.TWO_PHASE}
         };
     }
 
-    @Parameterized.Parameter public AggregatePhaseStrategy aggPhase;
+    @Parameter private AggregatePhaseStrategy aggPhase;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -77,8 +80,8 @@
                         aggPhase.toString());
     }
 
-    @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
@@ -112,8 +115,8 @@
                 result);
     }
 
-    @Test
-    public void testEventTimeHopWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeHopWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -141,8 +144,8 @@
                 result);
     }
 
-    @Test
-    public void testEventTimeCumulateWindow() throws Exception {
+    @TestTemplate
+    void testEventTimeCumulateWindow() throws Exception {
         createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
         compileSqlAndExecutePlan(
                         "insert into MySink select\n"
@@ -177,8 +180,8 @@
                 result);
     }
 
-    @Test
-    public void testDistinctSplitEnabled() throws Exception {
+    @TestTemplate
+    void testDistinctSplitEnabled() throws Exception {
         tableEnv.getConfig()
                 .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
         createTestValuesSinkTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
index a377dc4..75eb714 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
@@ -23,18 +23,19 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window deduplicate json plan. */
-public class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
+class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "ts STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
index 7e7565c..c4e84d9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
@@ -23,18 +23,19 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window join json plan. */
-public class WindowJoinJsonITCase extends JsonPlanTestBase {
+class WindowJoinJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -80,7 +81,7 @@
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "name STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
index e5489b8..1ccf0e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
@@ -23,18 +23,19 @@
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.JsonPlanTestBase;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 /** Test for window deduplicate json plan. */
-public class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
+class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
         super.setup();
         createTestValuesSourceTable(
                 "MyTable",
@@ -59,7 +60,7 @@
     }
 
     @Test
-    public void testEventTimeTumbleWindow() throws Exception {
+    void testEventTimeTumbleWindow() throws Exception {
         createTestValuesSinkTable(
                 "MySink",
                 "ts STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 961dc62..6d6dbcf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,18 +28,22 @@
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,17 +58,27 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The base class for json plan testing. */
-public abstract class JsonPlanTestBase extends AbstractTestBase {
+public abstract class JsonPlanTestBase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @TempDir protected Path tempFolder;
 
     protected TableEnvironment tableEnv;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    protected void setup() throws Exception {
         tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    protected void after() {
         TestValuesTableFactory.clearAllData();
     }
 
@@ -216,7 +231,7 @@
     protected void createTestCsvSourceTable(
             String tableName, List<String> data, String... fieldNameAndTypes) throws IOException {
         checkArgument(fieldNameAndTypes.length > 0);
-        File sourceFile = TEMPORARY_FOLDER.newFile();
+        File sourceFile = TempDirUtils.newFile(tempFolder);
         Collections.shuffle(data);
         Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
         String ddl =
@@ -246,7 +261,7 @@
                 StringUtils.isNullOrWhitespaceOnly(partitionFields)
                         ? ""
                         : "\n partitioned by (" + partitionFields + ") \n";
-        File sinkPath = TEMPORARY_FOLDER.newFolder();
+        File sinkPath = TempDirUtils.newFolder(tempFolder);
         String ddl =
                 String.format(
                         "CREATE TABLE %s (\n"