[FLINK-33024][table-planner][JUnit5 Migration] Module: flink-table-planner (JsonPlanTestBase) (#23353)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 4ca0a5d..591f0a5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -25,10 +25,11 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.commons.io.FileUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
@@ -47,15 +48,16 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link CompiledPlan} and related {@link TableEnvironment} methods. */
-public class CompiledPlanITCase extends JsonPlanTestBase {
+class CompiledPlanITCase extends JsonPlanTestBase {
private static final List<String> DATA =
Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
private static final String[] COLUMNS_DEFINITION =
new String[] {"a bigint", "b int", "c varchar"};
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
String srcTableDdl =
@@ -76,7 +78,7 @@
}
@Test
- public void testCompilePlanSql() throws IOException {
+ void testCompilePlanSql() throws IOException {
CompiledPlan compiledPlan =
tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable");
String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
@@ -92,7 +94,7 @@
}
@Test
- public void testExecutePlanSql() throws Exception {
+ void testExecutePlanSql() throws Exception {
File sinkPath = createSourceSinkTables();
tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").execute().await();
@@ -101,10 +103,10 @@
}
@Test
- public void testExecuteCtasPlanSql() throws Exception {
+ void testExecuteCtasPlanSql() throws Exception {
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
- File sinkPath = TEMPORARY_FOLDER.newFolder();
+ File sinkPath = TempDirUtils.newFolder(tempFolder);
assertThatThrownBy(
() ->
tableEnv.compilePlanSql(
@@ -125,7 +127,7 @@
}
@Test
- public void testExecutePlanTable() throws Exception {
+ void testExecutePlanTable() throws Exception {
File sinkPath = createSourceSinkTables();
tableEnv.from("src").select($("*")).insertInto("sink").compilePlan().execute().await();
@@ -134,8 +136,9 @@
}
@Test
- public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
- Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+ void testCompileWriteToFileAndThenExecuteSql() throws Exception {
+ Path planPath =
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");
File sinkPath = createSourceSinkTables();
@@ -148,8 +151,9 @@
}
@Test
- public void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
- Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
+ void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
+ Path planPath =
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");
File sinkPath = createSourceSinkTables();
@@ -165,9 +169,9 @@
}
@Test
- public void testCompilePlan() throws Exception {
+ void testCompilePlan() throws Exception {
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();
File sinkPath = createSourceSinkTables();
@@ -195,9 +199,9 @@
}
@Test
- public void testCompilePlanWithStatementSet() throws Exception {
+ void testCompilePlanWithStatementSet() throws Exception {
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -226,9 +230,9 @@
}
@Test
- public void testCompilePlanIfNotExists() throws Exception {
+ void testCompilePlanIfNotExists() throws Exception {
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();
File sinkPath = createSourceSinkTables();
@@ -256,11 +260,14 @@
}
@Test
- public void testCompilePlanOverwrite() throws Exception {
+ void testCompilePlanOverwrite() throws Exception {
tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(
+ URI.create(TempDirUtils.newFolder(tempFolder, "plan").getPath())
+ .getPath(),
+ "plan.json")
.toAbsolutePath();
List<String> expectedData =
@@ -297,9 +304,9 @@
}
@Test
- public void testCompileAndExecutePlan() throws Exception {
+ void testCompileAndExecutePlan() throws Exception {
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();
File sinkPath = createSourceSinkTables();
@@ -316,9 +323,9 @@
}
@Test
- public void testCompileAndExecutePlanWithStatementSet() throws Exception {
+ void testCompileAndExecutePlanWithStatementSet() throws Exception {
Path planPath =
- Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
+ Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
@@ -344,7 +351,7 @@
}
@Test
- public void testExplainPlan() throws IOException {
+ void testExplainPlan() throws IOException {
String planFromResources =
JsonTestUtils.setFlinkVersion(
JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"),
@@ -360,7 +367,7 @@
}
@Test
- public void testPersistedConfigOption() throws Exception {
+ void testPersistedConfigOption() throws Exception {
List<String> data =
Stream.concat(
DATA.stream(),
@@ -397,7 +404,7 @@
}
@Test
- public void testBatchMode() {
+ void testBatchMode() {
tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
String srcTableDdl =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
index 8d03a24..89aac5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CalcJsonPlanITCase.java
@@ -25,7 +25,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
@@ -33,10 +33,10 @@
import java.util.List;
/** Test for calc json plan. */
-public class CalcJsonPlanITCase extends JsonPlanTestBase {
+class CalcJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testSimpleCalc() throws Exception {
+ void testSimpleCalc() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath =
@@ -55,7 +55,7 @@
}
@Test
- public void testCalcWithUdf() throws Exception {
+ void testCalcWithUdf() throws Exception {
tableEnv.createTemporaryFunction("udf1", new JavaFunc0());
tableEnv.createTemporarySystemFunction("udf2", new JavaFunc2());
tableEnv.createFunction("udf3", UdfWithOpen.class);
@@ -86,7 +86,7 @@
}
@Test
- public void testProjectPushDown() throws Exception {
+ void testProjectPushDown() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath = createTestCsvSinkTable("MySink", "b int", "a bigint", "a1 varchar");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
index 833ba3f..5ceb0bd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ChangelogSourceJsonPlanITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,10 +31,10 @@
import java.util.Map;
/** Integration tests for operations on changelog source, including upsert source. */
-public class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
+class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testChangelogSource() throws Exception {
+ void testChangelogSource() throws Exception {
registerChangelogSource();
createTestNonInsertOnlyValuesSinkTable(
"user_sink",
@@ -56,7 +56,7 @@
}
@Test
- public void testToUpsertSource() throws Exception {
+ void testToUpsertSource() throws Exception {
registerUpsertSource();
createTestNonInsertOnlyValuesSinkTable(
"user_sink",
@@ -79,7 +79,7 @@
// ------------------------------------------------------------------------------------------
- public void registerChangelogSource() {
+ protected void registerChangelogSource() {
Map<String, String> properties = new HashMap<>();
properties.put("changelog-mode", "I,UA,UB,D");
createTestValuesSourceTable(
@@ -95,7 +95,7 @@
properties);
}
- public void registerUpsertSource() {
+ protected void registerUpsertSource() {
Map<String, String> properties = new HashMap<>();
properties.put("changelog-mode", "I,UA,D");
createTestValuesSourceTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
index 6d1a220..fb879bf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
@@ -27,7 +27,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -39,10 +39,10 @@
* Tests for configuring operator-level state TTL via {@link
* org.apache.flink.table.api.CompiledPlan}.
*/
-public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
+class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
@Test
- public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
+ void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
String dataId =
TestValuesTableFactory.registerRowData(
Arrays.asList(
@@ -117,7 +117,7 @@
}
@Test
- public void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
+ void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
String leftTableDataId =
TestValuesTableFactory.registerRowData(
Arrays.asList(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
index 1874771..a90c3cd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
@@ -23,8 +23,8 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
@@ -32,16 +32,16 @@
import java.util.concurrent.ExecutionException;
/** Integration tests for correlate. */
-public class CorrelateJsonPlanITCase extends JsonPlanTestBase {
+class CorrelateJsonPlanITCase extends JsonPlanTestBase {
- @Before
- public void before() {
+ @BeforeEach
+ void before() {
List<Row> data = Collections.singletonList(Row.of("1,1,hi"));
createTestValuesSourceTable("MyTable", data, "a varchar");
}
@Test
- public void testSystemFuncByObject() throws ExecutionException, InterruptedException {
+ void testSystemFuncByObject() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -53,7 +53,7 @@
}
@Test
- public void testSystemFuncByClass() throws ExecutionException, InterruptedException {
+ void testSystemFuncByClass() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -65,7 +65,7 @@
}
@Test
- public void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
+ void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
tableEnv.createTemporaryFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -77,7 +77,7 @@
}
@Test
- public void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
+ void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
tableEnv.createTemporaryFunction(
"STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -89,7 +89,7 @@
}
@Test
- public void testFilter() throws ExecutionException, InterruptedException {
+ void testFilter() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
@@ -103,7 +103,7 @@
}
@Test
- public void testUnnest() throws ExecutionException, InterruptedException {
+ void testUnnest() throws ExecutionException, InterruptedException {
List<Row> data =
Collections.singletonList(
Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), Row.of("3")}));
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
index d095ae0..afb9f5e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
@@ -24,16 +24,16 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/** Test for deduplication json plan. */
-public class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
+class DeduplicationJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testDeduplication() throws Exception {
+ void testDeduplication() throws Exception {
List<Row> data =
Arrays.asList(
Row.of(1L, "terry", "pen", 1000L),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
index 4ebf82e..1226dac 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ExpandJsonPlanITCase.java
@@ -26,16 +26,16 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/** Test for expand json plan. */
-public class ExpandJsonPlanITCase extends JsonPlanTestBase {
+class ExpandJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testExpand() throws Exception {
+ void testExpand() throws Exception {
tableEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
index 3450478..367253e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
@@ -27,29 +27,32 @@
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/** Test for group aggregate json plan. */
-@RunWith(Parameterized.class)
-public class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
- @Parameterized.Parameter public boolean isMiniBatchEnabled;
+ @Parameter private boolean isMiniBatchEnabled;
- @Parameterized.Parameters(name = "isMiniBatchEnabled={0}")
- public static List<Boolean> testData() {
+ @Parameters(name = "isMiniBatchEnabled={0}")
+ private static List<Boolean> testData() {
return Arrays.asList(true, false);
}
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
if (isMiniBatchEnabled) {
tableEnv.getConfig()
@@ -63,8 +66,8 @@
}
}
- @Test
- public void testSimpleAggCallsWithGroupBy() throws Exception {
+ @TestTemplate
+ void testSimpleAggCallsWithGroupBy() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -90,8 +93,8 @@
assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result);
}
- @Test
- public void testDistinctAggCalls() throws Exception {
+ @TestTemplate
+ void testDistinctAggCalls() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data2()),
@@ -130,8 +133,8 @@
result);
}
- @Test
- public void testUserDefinedAggCallsWithoutMerge() throws Exception {
+ @TestTemplate
+ void testUserDefinedAggCallsWithoutMerge() throws Exception {
tableEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction());
tableEnv.createFunction("my_avg", WeightedAvg.class);
tableEnv.createTemporarySystemFunction("my_sum2", VarSum2AggFunction.class);
@@ -166,8 +169,8 @@
Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 58, 0, 3]"), result);
}
- @Test
- public void testUserDefinedAggCallsWithMerge() throws Exception {
+ @TestTemplate
+ void testUserDefinedAggCallsWithMerge() throws Exception {
tableEnv.createFunction("my_avg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
tableEnv.createTemporarySystemFunction(
"my_concat_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
index 71e9927..f461640 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupWindowAggregateJsonITCase.java
@@ -23,18 +23,19 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for group window aggregate json plan. */
-public class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
+class GroupWindowAggregateJsonITCase extends JsonPlanTestBase {
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"MyTable",
@@ -59,7 +60,7 @@
}
@Test
- public void testEventTimeTumbleWindow() throws Exception {
+ void testEventTimeTumbleWindow() throws Exception {
createTestValuesSinkTable(
"MySink",
"name STRING",
@@ -93,7 +94,7 @@
}
@Test
- public void testEventTimeHopWindow() throws Exception {
+ void testEventTimeHopWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
compileSqlAndExecutePlan(
"insert into MySink select\n"
@@ -121,7 +122,7 @@
}
@Test
- public void testEventTimeSessionWindow() throws Exception {
+ void testEventTimeSessionWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
compileSqlAndExecutePlan(
"insert into MySink select\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
index ea76839..8888bca 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -27,8 +27,8 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Duration;
@@ -37,10 +37,11 @@
import java.util.concurrent.ExecutionException;
/** Test for incremental aggregate json plan. */
-public class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
+class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
tableEnv.getConfig()
.set(
@@ -56,8 +57,7 @@
}
@Test
- public void testIncrementalAggregate()
- throws IOException, ExecutionException, InterruptedException {
+ void testIncrementalAggregate() throws IOException, ExecutionException, InterruptedException {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
index def8c31..94a1e41 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java
@@ -22,17 +22,17 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/** Test for IntervalJoin json plan. */
-public class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
+class IntervalJoinJsonPlanITCase extends JsonPlanTestBase {
/** test process time inner join. * */
@Test
- public void testProcessTimeInnerJoin() throws Exception {
+ void testProcessTimeInnerJoin() throws Exception {
List<Row> rowT1 =
Arrays.asList(
Row.of(1, 1L, "Hi1"),
@@ -69,7 +69,7 @@
}
@Test
- public void testRowTimeInnerJoin() throws Exception {
+ void testRowTimeInnerJoin() throws Exception {
List<Row> rowT1 =
Arrays.asList(
Row.of(1, 1L, "Hi1"),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
index 7fc86c5..a722226 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/JoinJsonPlanITCase.java
@@ -23,19 +23,19 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
import java.util.List;
/** Test for join json plan. */
-public class JoinJsonPlanITCase extends JsonPlanTestBase {
+class JoinJsonPlanITCase extends JsonPlanTestBase {
@Override
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"A",
@@ -55,7 +55,7 @@
/** test non-window inner join. * */
@Test
- public void testNonWindowInnerJoin() throws Exception {
+ void testNonWindowInnerJoin() throws Exception {
List<String> dataT1 =
Arrays.asList(
"1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8",
@@ -88,7 +88,7 @@
}
@Test
- public void testIsNullInnerJoinWithNullCond() throws Exception {
+ void testIsNullInnerJoinWithNullCond() throws Exception {
List<String> dataT1 =
Arrays.asList(
"1,1,Hi1", "1,2,Hi2", "1,2,Hi2", "1,5,Hi3", "2,7,Hi5", "1,9,Hi6", "1,8,Hi8",
@@ -125,7 +125,7 @@
}
@Test
- public void testJoin() throws Exception {
+ void testJoin() throws Exception {
createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a3, b4 FROM A, B WHERE a2 = b2")
.await();
@@ -136,7 +136,7 @@
}
@Test
- public void testInnerJoin() throws Exception {
+ void testInnerJoin() throws Exception {
createTestValuesSinkTable("MySink", "a1 int", "b1 int");
compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1")
.await();
@@ -145,7 +145,7 @@
}
@Test
- public void testJoinWithFilter() throws Exception {
+ void testJoinWithFilter() throws Exception {
createTestValuesSinkTable("MySink", "a3 varchar", "b4 varchar");
compileSqlAndExecutePlan(
"insert into MySink \n"
@@ -156,7 +156,7 @@
}
@Test
- public void testInnerJoinWithDuplicateKey() throws Exception {
+ void testInnerJoinWithDuplicateKey() throws Exception {
createTestValuesSinkTable("MySink", "a1 int", "b1 int", "b3 int");
compileSqlAndExecutePlan(
"insert into MySink \n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
index 50b4f5d..6507481 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -31,9 +31,9 @@
import java.util.concurrent.ExecutionException;
/** Test for limit JsonPlan ser/de. */
-public class LimitJsonPlanITCase extends JsonPlanTestBase {
+class LimitJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testLimit() throws ExecutionException, InterruptedException, IOException {
+ void testLimit() throws ExecutionException, InterruptedException, IOException {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
index c24a619..71e76e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LookupJoinJsonPlanITCase.java
@@ -22,17 +22,19 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for LookupJoin json plan. */
-public class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
+class LookupJoinJsonPlanITCase extends JsonPlanTestBase {
+ @BeforeEach
@Override
- public void setup() throws Exception {
+ protected void setup() throws Exception {
super.setup();
List<Row> rowT1 =
Arrays.asList(
@@ -66,7 +68,7 @@
/** test join temporal table. * */
@Test
- public void testJoinLookupTable() throws Exception {
+ void testJoinLookupTable() throws Exception {
compileSqlAndExecutePlan(
"insert into MySink "
+ "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table \n"
@@ -81,7 +83,7 @@
}
@Test
- public void testJoinLookupTableWithPushDown() throws Exception {
+ void testJoinLookupTableWithPushDown() throws Exception {
compileSqlAndExecutePlan(
"insert into MySink \n"
+ "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table \n "
@@ -93,7 +95,7 @@
}
@Test
- public void testLeftJoinLookupTableWithPreFilter() throws Exception {
+ void testLeftJoinLookupTableWithPreFilter() throws Exception {
compileSqlAndExecutePlan(
"insert into MySink "
+ "SELECT T.id, T.len, T.content, D.name FROM src AS T LEFT JOIN user_table \n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
index 6f68c5c..ca64b21 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java
@@ -22,16 +22,16 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/** Test json deserialization for match recognize. */
-public class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
+class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testSimpleMatch() throws Exception {
+ void testSimpleMatch() throws Exception {
List<Row> data =
Arrays.asList(
Row.of(1L, "a"),
@@ -70,7 +70,7 @@
}
@Test
- public void testComplexMatch() throws Exception {
+ void testComplexMatch() throws Exception {
List<Row> data =
Arrays.asList(
Row.of("ACME", 1L, 19, 1),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
index 0aaf882..55fc7a0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/OverAggregateJsonPlanITCase.java
@@ -26,7 +26,7 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -34,10 +34,10 @@
import java.util.concurrent.ExecutionException;
/** Test json deserialization for over aggregate. */
-public class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
+class OverAggregateJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testProcTimeBoundedPartitionedRowsOver()
+ void testProcTimeBoundedPartitionedRowsOver()
throws ExecutionException, InterruptedException, IOException {
createTestValuesSourceTable(
"MyTable",
@@ -79,7 +79,7 @@
}
@Test
- public void testProcTimeUnboundedNonPartitionedRangeOver()
+ void testProcTimeUnboundedNonPartitionedRangeOver()
throws IOException, ExecutionException, InterruptedException {
List<Row> data =
Arrays.asList(
@@ -124,7 +124,7 @@
}
@Test
- public void testRowTimeBoundedPartitionedRangeOver()
+ void testRowTimeBoundedPartitionedRangeOver()
throws IOException, ExecutionException, InterruptedException {
List<Row> data =
Arrays.asList(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
index 8397a40..1b86627 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/RankJsonPlanITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -31,9 +31,9 @@
import java.util.concurrent.ExecutionException;
/** Test for Rank JsonPlan ser/de. */
-public class RankJsonPlanITCase extends JsonPlanTestBase {
+class RankJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testRank() throws ExecutionException, InterruptedException, IOException {
+ void testRank() throws ExecutionException, InterruptedException, IOException {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data1()),
@@ -52,7 +52,7 @@
}
@Test
- public void testFirstN() throws ExecutionException, InterruptedException, IOException {
+ void testFirstN() throws ExecutionException, InterruptedException, IOException {
createTestValuesSourceTable(
"MyTable1",
JavaScalaConversionUtil.toJava(TestData.data4()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
index 1320fa2..bb40b5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SargJsonPlanITCase.java
@@ -22,16 +22,16 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
/** Test for Sarg JsonPlan ser/de. */
-public class SargJsonPlanITCase extends JsonPlanTestBase {
+class SargJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testSarg() throws ExecutionException, InterruptedException {
+ void testSarg() throws ExecutionException, InterruptedException {
List<Row> data =
Arrays.asList(Row.of(1), Row.of(2), Row.of((Integer) null), Row.of(4), Row.of(5));
createTestValuesSourceTable("MyTable", data, "a int");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
index bd1f168..7473057 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/SortLimitJsonPlanITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -31,9 +31,9 @@
import java.util.concurrent.ExecutionException;
/** Test for sort limit JsonPlan ser/de. */
-public class SortLimitJsonPlanITCase extends JsonPlanTestBase {
+class SortLimitJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testSortLimit() throws ExecutionException, InterruptedException, IOException {
+ void testSortLimit() throws ExecutionException, InterruptedException, IOException {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
index 3a4a6cc..73577d0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSinkJsonPlanITCase.java
@@ -21,8 +21,8 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
@@ -30,18 +30,19 @@
import java.util.List;
/** Test for table sink json plan. */
-public class TableSinkJsonPlanITCase extends JsonPlanTestBase {
+class TableSinkJsonPlanITCase extends JsonPlanTestBase {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestCsvSourceTable("MyTable", data, "a bigint", "b int", "c varchar");
}
@Test
- public void testPartitioning() throws Exception {
+ void testPartitioning() throws Exception {
File sinkPath =
createTestCsvSinkTable(
"MySink",
@@ -55,7 +56,7 @@
}
@Test
- public void testWritingMetadata() throws Exception {
+ void testWritingMetadata() throws Exception {
createTestValuesSinkTable(
"MySink",
new String[] {"a bigint", "b int", "c varchar METADATA"},
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
index 6a24ee0..b757407 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TableSourceJsonPlanITCase.java
@@ -22,7 +22,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
@@ -32,10 +32,10 @@
import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
/** Test for table source json plan. */
-public class TableSourceJsonPlanITCase extends JsonPlanTestBase {
+class TableSourceJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testProjectPushDown() throws Exception {
+ void testProjectPushDown() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int");
@@ -46,7 +46,7 @@
}
@Test
- public void testReadingMetadata() throws Exception {
+ void testReadingMetadata() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -65,7 +65,7 @@
}
@Test
- public void testReadingMetadataWithProjectionPushDownDisabled() throws Exception {
+ void testReadingMetadataWithProjectionPushDownDisabled() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -85,7 +85,7 @@
}
@Test
- public void testFilterPushDown() throws Exception {
+ void testFilterPushDown() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath = createTestCsvSinkTable("MySink", "a bigint", "b int", "c varchar");
@@ -96,7 +96,7 @@
}
@Test
- public void testPartitionPushDown() throws Exception {
+ void testPartitionPushDown() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -115,7 +115,7 @@
}
@Test
- public void testWatermarkPushDown() throws Exception {
+ void testWatermarkPushDown() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -146,7 +146,7 @@
}
@Test
- public void testPushDowns() throws Exception {
+ void testPushDowns() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
index ed57d92..56ebcf5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
@@ -23,7 +23,8 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
@@ -31,10 +32,11 @@
import static org.apache.flink.table.api.Expressions.$;
/** Test for TemporalJoin json plan. */
-public class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
+class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
+ @BeforeEach
@Override
- public void setup() throws Exception {
+ protected void setup() throws Exception {
super.setup();
List<Row> orders =
Arrays.asList(
@@ -78,7 +80,7 @@
/** test process time inner join. * */
@Test
- public void testJoinTemporalFunction() throws Exception {
+ void testJoinTemporalFunction() throws Exception {
compileSqlAndExecutePlan(
"INSERT INTO MySink "
+ "SELECT amount * r.rate "
@@ -91,7 +93,7 @@
}
@Test
- public void testTemporalTableJoin() throws Exception {
+ void testTemporalTableJoin() throws Exception {
compileSqlAndExecutePlan(
"INSERT INTO MySink "
+ "SELECT amount * r.rate "
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
index 888e606..a97a90b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,10 +31,10 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Test for temporal sort json plan. */
-public class TemporalSortJsonITCase extends JsonPlanTestBase {
+class TemporalSortJsonITCase extends JsonPlanTestBase {
@Test
- public void testSortProcessingTime() throws Exception {
+ void testSortProcessingTime() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.smallData3()),
@@ -52,7 +52,7 @@
}
@Test
- public void testSortRowTime() throws Exception {
+ void testSortRowTime() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
index 8cc6ce3..1d7096a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/UnionJsonPlanITCase.java
@@ -23,15 +23,15 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/** Test json serialization/deserialization for union. */
-public class UnionJsonPlanITCase extends JsonPlanTestBase {
+class UnionJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testUnion() throws Exception {
+ void testUnion() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data1()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
index eb96759..e71546f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ValuesJsonPlanITCase.java
@@ -21,16 +21,16 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/** Test for values json plan. */
-public class ValuesJsonPlanITCase extends JsonPlanTestBase {
+class ValuesJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testValues() throws Exception {
+ void testValues() throws Exception {
createTestValuesSinkTable("MySink", "b INT", "a INT", "c VARCHAR");
compileSqlAndExecutePlan(
"INSERT INTO MySink SELECT * from (VALUES (1, 2, 'Hi'), (3, 4, 'Hello'))")
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
index 4ac0a18..bfc32e8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
@@ -22,7 +22,7 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
@@ -31,10 +31,10 @@
import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
/** Test for watermark assigner json plan. */
-public class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
+class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
@Test
- public void testWatermarkAssigner() throws Exception {
+ void testWatermarkAssigner() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
@@ -65,7 +65,7 @@
}
@Test
- public void testWatermarkPushDownWithMetadata() throws Exception {
+ void testWatermarkPushDownWithMetadata() throws Exception {
// to verify FLINK-30598: the case declares metadata field first, without the fix it'll get
// wrong code generated by WatermarkGeneratorCodeGenerator which reference the incorrect
// varchar column as the watermark field.
@@ -102,7 +102,7 @@
}
@Test
- public void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
+ void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
createTestValuesSourceTable(
"MyTable",
JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
index 6e7e848..dc07965 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
@@ -24,32 +24,35 @@
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for window aggregate json plan. */
-@RunWith(Parameterized.class)
-public class WindowAggregateJsonITCase extends JsonPlanTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+class WindowAggregateJsonITCase extends JsonPlanTestBase {
- @Parameterized.Parameters(name = "agg_phase = {0}")
- public static Object[] parameters() {
+ @Parameters(name = "agg_phase = {0}")
+ private static Object[] parameters() {
return new Object[][] {
new Object[] {AggregatePhaseStrategy.ONE_PHASE},
new Object[] {AggregatePhaseStrategy.TWO_PHASE}
};
}
- @Parameterized.Parameter public AggregatePhaseStrategy aggPhase;
+ @Parameter private AggregatePhaseStrategy aggPhase;
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"MyTable",
@@ -77,8 +80,8 @@
aggPhase.toString());
}
- @Test
- public void testEventTimeTumbleWindow() throws Exception {
+ @TestTemplate
+ void testEventTimeTumbleWindow() throws Exception {
createTestValuesSinkTable(
"MySink",
"name STRING",
@@ -112,8 +115,8 @@
result);
}
- @Test
- public void testEventTimeHopWindow() throws Exception {
+ @TestTemplate
+ void testEventTimeHopWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
compileSqlAndExecutePlan(
"insert into MySink select\n"
@@ -141,8 +144,8 @@
result);
}
- @Test
- public void testEventTimeCumulateWindow() throws Exception {
+ @TestTemplate
+ void testEventTimeCumulateWindow() throws Exception {
createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
compileSqlAndExecutePlan(
"insert into MySink select\n"
@@ -177,8 +180,8 @@
result);
}
- @Test
- public void testDistinctSplitEnabled() throws Exception {
+ @TestTemplate
+ void testDistinctSplitEnabled() throws Exception {
tableEnv.getConfig()
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
createTestValuesSinkTable(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
index a377dc4..75eb714 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowDeduplicateJsonITCase.java
@@ -23,18 +23,19 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for window deduplicate json plan. */
-public class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
+class WindowDeduplicateJsonITCase extends JsonPlanTestBase {
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"MyTable",
@@ -59,7 +60,7 @@
}
@Test
- public void testEventTimeTumbleWindow() throws Exception {
+ void testEventTimeTumbleWindow() throws Exception {
createTestValuesSinkTable(
"MySink",
"ts STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
index 7e7565c..c4e84d9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowJoinJsonITCase.java
@@ -23,18 +23,19 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for window join json plan. */
-public class WindowJoinJsonITCase extends JsonPlanTestBase {
+class WindowJoinJsonITCase extends JsonPlanTestBase {
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"MyTable",
@@ -80,7 +81,7 @@
}
@Test
- public void testEventTimeTumbleWindow() throws Exception {
+ void testEventTimeTumbleWindow() throws Exception {
createTestValuesSinkTable(
"MySink",
"name STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
index e5489b8..1ccf0e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowTableFunctionJsonITCase.java
@@ -23,18 +23,19 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/** Test for window deduplicate json plan. */
-public class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
+class WindowTableFunctionJsonITCase extends JsonPlanTestBase {
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ @Override
+ protected void setup() throws Exception {
super.setup();
createTestValuesSourceTable(
"MyTable",
@@ -59,7 +60,7 @@
}
@Test
- public void testEventTimeTumbleWindow() throws Exception {
+ void testEventTimeTumbleWindow() throws Exception {
createTestValuesSinkTable(
"MySink",
"ts STRING",
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 961dc62..6d6dbcf 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.utils;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -27,18 +28,22 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.CompiledPlanUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -53,17 +58,27 @@
import static org.assertj.core.api.Assertions.assertThat;
/** The base class for json plan testing. */
-public abstract class JsonPlanTestBase extends AbstractTestBase {
+public abstract class JsonPlanTestBase {
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build());
+
+ @TempDir protected Path tempFolder;
protected TableEnvironment tableEnv;
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ protected void setup() throws Exception {
tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
}
- @After
- public void after() {
+ @AfterEach
+ protected void after() {
TestValuesTableFactory.clearAllData();
}
@@ -216,7 +231,7 @@
protected void createTestCsvSourceTable(
String tableName, List<String> data, String... fieldNameAndTypes) throws IOException {
checkArgument(fieldNameAndTypes.length > 0);
- File sourceFile = TEMPORARY_FOLDER.newFile();
+ File sourceFile = TempDirUtils.newFile(tempFolder);
Collections.shuffle(data);
Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
String ddl =
@@ -246,7 +261,7 @@
StringUtils.isNullOrWhitespaceOnly(partitionFields)
? ""
: "\n partitioned by (" + partitionFields + ") \n";
- File sinkPath = TEMPORARY_FOLDER.newFolder();
+ File sinkPath = TempDirUtils.newFolder(tempFolder);
String ddl =
String.format(
"CREATE TABLE %s (\n"