[FLINK-33032][table-planner][JUnit5 Migration] Module: flink-table-planner (ExpressionTestBase) (#23358)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
index 6e40631..fccea6d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
@@ -33,13 +33,13 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ColumnReferenceFinder}. */
-public class ColumnReferenceFinderTest extends TableTestBase {
+class ColumnReferenceFinderTest extends TableTestBase {
 
     private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
     private ResolvedSchema resolvedSchema;
 
     @BeforeEach
-    public void beforeEach() {
+    void beforeEach() {
         resolvedSchema =
                 util.testingTableEnv()
                         .getCatalogManager()
@@ -62,7 +62,7 @@
     }
 
     @Test
-    public void testFindReferencedColumn() {
+    void testFindReferencedColumn() {
         assertThat(ColumnReferenceFinder.findReferencedColumn("b", resolvedSchema))
                 .isEqualTo(Collections.emptySet());
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/TypeConversionsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/TypeConversionsTest.java
index 62252c9..6d58164 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/TypeConversionsTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/TypeConversionsTest.java
@@ -21,7 +21,7 @@
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.planner.expressions.utils.ScalarOperatorsTestBase;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
 import java.time.LocalDate;
@@ -32,33 +32,33 @@
 import static org.apache.flink.table.api.Expressions.lit;
 
 /** Tests for {@code CAST} expression. */
-public class TypeConversionsTest extends ScalarOperatorsTestBase {
+class TypeConversionsTest extends ScalarOperatorsTestBase {
     @Test
-    public void testTimestampWithLocalTimeZoneToString() {
+    void testTimestampWithLocalTimeZoneToString() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
         testTableApi(lit(Instant.EPOCH).cast(DataTypes.STRING()), "1970-01-01 02:00:00");
     }
 
     @Test
-    public void testTimestampWithLocalTimeZoneToDate() {
+    void testTimestampWithLocalTimeZoneToDate() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(4));
         testTableApi(lit(Instant.EPOCH).cast(DataTypes.DATE()), "1970-01-01");
     }
 
     @Test
-    public void testTimestampWithLocalTimeZoneToTime() {
+    void testTimestampWithLocalTimeZoneToTime() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(4));
         testTableApi(lit(Instant.EPOCH).cast(DataTypes.TIME(0)), "04:00:00");
     }
 
     @Test
-    public void testTimestampWithLocalTimeZoneToTimestamp() {
+    void testTimestampWithLocalTimeZoneToTimestamp() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(3));
         testTableApi(lit(Instant.EPOCH).cast(DataTypes.TIMESTAMP(0)), "1970-01-01 03:00:00");
     }
 
     @Test
-    public void testStringToTimestampWithLocalTimeZone() {
+    void testStringToTimestampWithLocalTimeZone() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
         testTableApi(
                 lit("1970-01-01 00:00:00").cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
@@ -70,7 +70,7 @@
     }
 
     @Test
-    public void testTimestampToTimestampWithLocalTimeZone() {
+    void testTimestampToTimestampWithLocalTimeZone() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
         testTableApi(
                 lit(LocalDateTime.parse("1970-01-01T00:00:00"))
@@ -83,7 +83,7 @@
     }
 
     @Test
-    public void testTimeToTimestampWithLocalTimeZone() {
+    void testTimeToTimestampWithLocalTimeZone() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
         testTableApi(
                 lit(LocalTime.parse("12:00:00")).cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(0)),
@@ -95,7 +95,7 @@
     }
 
     @Test
-    public void testDateToTimestampWithLocalTimeZone() {
+    void testDateToTimestampWithLocalTimeZone() {
         tableConfig().setLocalTimeZone(ZoneOffset.ofHours(2));
         testTableApi(
                 lit(LocalDate.parse("1970-02-01"))
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
index af317c1..ffea6f9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverterTest.java
@@ -30,7 +30,7 @@
 import org.apache.calcite.util.DateString;
 import org.apache.calcite.util.TimeString;
 import org.apache.calcite.util.TimestampString;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.time.Duration;
@@ -44,14 +44,14 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ExpressionConverter}. */
-public class ExpressionConverterTest {
+class ExpressionConverterTest {
 
     private final PlannerContext plannerContext = PlannerMocks.create().getPlannerContext();
     private final ExpressionConverter converter =
             new ExpressionConverter(plannerContext.createRelBuilder());
 
     @Test
-    public void testLiteral() {
+    void testLiteral() {
         RexNode rex = converter.visit(valueLiteral((byte) 1));
         assertThat((int) ((RexLiteral) rex).getValueAs(Integer.class)).isEqualTo(1);
         assertThat(rex.getType().getSqlTypeName()).isEqualTo(SqlTypeName.TINYINT);
@@ -66,7 +66,7 @@
     }
 
     @Test
-    public void testCharLiteral() {
+    void testCharLiteral() {
         RexNode rex = converter.visit(valueLiteral("ABC", DataTypes.CHAR(4).notNull()));
         assertThat(((RexLiteral) rex).getValueAs(String.class)).isEqualTo("ABC ");
         assertThat(rex.getType().getSqlTypeName()).isEqualTo(SqlTypeName.CHAR);
@@ -74,7 +74,7 @@
     }
 
     @Test
-    public void testVarCharLiteral() {
+    void testVarCharLiteral() {
         RexNode rex = converter.visit(valueLiteral("ABC", DataTypes.STRING().notNull()));
         assertThat(((RexLiteral) rex).getValueAs(String.class)).isEqualTo("ABC");
         assertThat(rex.getType().getSqlTypeName()).isEqualTo(SqlTypeName.VARCHAR);
@@ -82,7 +82,7 @@
     }
 
     @Test
-    public void testBinaryLiteral() {
+    void testBinaryLiteral() {
         RexNode rex =
                 converter.visit(valueLiteral(new byte[] {1, 2, 3}, DataTypes.BINARY(4).notNull()));
         assertThat(((RexLiteral) rex).getValueAs(byte[].class)).isEqualTo(new byte[] {1, 2, 3, 0});
@@ -91,7 +91,7 @@
     }
 
     @Test
-    public void testTimestampLiteral() {
+    void testTimestampLiteral() {
         RexNode rex =
                 converter.visit(
                         valueLiteral(
@@ -104,7 +104,7 @@
     }
 
     @Test
-    public void testTimestampWithLocalZoneLiteral() {
+    void testTimestampWithLocalZoneLiteral() {
         RexNode rex =
                 converter.visit(
                         valueLiteral(
@@ -118,7 +118,7 @@
     }
 
     @Test
-    public void testTimeLiteral() {
+    void testTimeLiteral() {
         RexNode rex =
                 converter.visit(
                         valueLiteral(
@@ -130,7 +130,7 @@
     }
 
     @Test
-    public void testTimeLiteralBiggerPrecision() {
+    void testTimeLiteralBiggerPrecision() {
         RexNode rex =
                 converter.visit(
                         valueLiteral(
@@ -143,7 +143,7 @@
     }
 
     @Test
-    public void testDateLiteral() {
+    void testDateLiteral() {
         RexNode rex =
                 converter.visit(
                         valueLiteral(LocalDate.parse("2012-12-12"), DataTypes.DATE().notNull()));
@@ -153,7 +153,7 @@
     }
 
     @Test
-    public void testIntervalDayTime() {
+    void testIntervalDayTime() {
         Duration value = Duration.ofDays(3).plusMillis(21);
         RexNode rex = converter.visit(valueLiteral(value));
         assertThat(((RexLiteral) rex).getValueAs(BigDecimal.class))
@@ -167,7 +167,7 @@
     }
 
     @Test
-    public void testIntervalYearMonth() {
+    void testIntervalYearMonth() {
         Period value = Period.of(999, 3, 1);
         RexNode rex = converter.visit(valueLiteral(value));
         assertThat(((RexLiteral) rex).getValueAs(BigDecimal.class))
@@ -179,7 +179,7 @@
     }
 
     @Test
-    public void testDecimalLiteral() {
+    void testDecimalLiteral() {
         BigDecimal value = new BigDecimal("12345678.999");
         RexNode rex = converter.visit(valueLiteral(value));
         assertThat(((RexLiteral) rex).getValueAs(BigDecimal.class)).isEqualTo(value);
@@ -189,7 +189,7 @@
     }
 
     @Test
-    public void testSymbolLiteral() {
+    void testSymbolLiteral() {
         RexNode rex = converter.visit(valueLiteral(TimePointUnit.MICROSECOND));
         assertThat(((RexLiteral) rex).getValueAs(TimeUnit.class)).isEqualTo(TimeUnit.MICROSECOND);
         assertThat(rex.getType().getSqlTypeName()).isEqualTo(SqlTypeName.SYMBOL);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala
index 0af130b..d546510 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ArrayTypeTest.scala
@@ -21,7 +21,7 @@
 import org.apache.flink.table.planner.expressions.utils.ArrayTypeTestBase
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => gLocalTime}
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import java.time.{LocalDateTime => JLocalDateTime}
 
@@ -226,16 +226,16 @@
 
   @Test
   def testArrayIndexStaticCheckForTable(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Array element access needs an index starting at 1 but was 0.")
-    testTableApi('f2.at(0), "1")
+    testExpectedTableApiException(
+      'f2.at(0),
+      "Array element access needs an index starting at 1 but was 0.")
   }
 
   @Test
   def testArrayIndexStaticCheckForSql(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Array element access needs an index starting at 1 but was 0.")
-    testSqlApi("f2[0]", "1")
+    testExpectedSqlException(
+      "f2[0]",
+      "Array element access needs an index starting at 1 but was 0.")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/CompositeAccessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/CompositeAccessTest.scala
index 85efc75..9f17d25 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/CompositeAccessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/CompositeAccessTest.scala
@@ -20,7 +20,7 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.CompositeTypeTestBase
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class CompositeAccessTest extends CompositeTypeTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
index 18d0dd0..014c50b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
@@ -22,7 +22,7 @@
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class DecimalTypeTest extends ExpressionTestBase {
 
@@ -385,11 +385,11 @@
 
     testAllApis((-'f51).round(1), "round(-f51,1)", "-100.0")
 
-    testAllApis(('f51).round(-1), "round(f51,-1)", "100")
+    testAllApis('f51.round(-1), "round(f51,-1)", "100")
 
     testAllApis((-'f51).round(-1), "round(-f51,-1)", "-100")
 
-    testAllApis(('f52).round(-1), "round(f52,-1)", "NULL")
+    testAllApis('f52.round(-1), "round(f52,-1)", "NULL")
   }
 
   @Test // functions e.g. sin() that treat Decimal as double
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/LiteralTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/LiteralTest.scala
index 30e20a4..32f4a59 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/LiteralTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/LiteralTest.scala
@@ -24,7 +24,7 @@
 import org.apache.flink.table.planner.expressions.utils.{ExpressionTestBase, Func3}
 import org.apache.flink.types.Row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class LiteralTest extends ExpressionTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/MapTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/MapTypeTest.scala
index c2aaa4a..69cffb2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/MapTypeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/MapTypeTest.scala
@@ -22,7 +22,7 @@
 import org.apache.flink.table.planner.expressions.utils.MapTypeTestBase
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => gLocalTime}
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import java.time.{LocalDateTime => JLocalTimestamp}
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
index a21b4b3..80691a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
@@ -24,13 +24,13 @@
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.table.planner.utils.{InternalConfigOptions, TableConfigUtils}
+import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
 import org.apache.flink.types.Row
 
-import org.junit.Assert.assertEquals
-import org.junit.Assume.assumeTrue
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assumptions.assumeThat
+import org.junit.jupiter.api.TestTemplate
+import org.junit.jupiter.api.extension.ExtendWith
 
 import java.lang.{Long => JLong}
 import java.sql.Time
@@ -42,12 +42,12 @@
 import scala.collection.mutable
 
 /** Tests that check all non-deterministic functions can be executed. */
-@RunWith(classOf[Parameterized])
+@ExtendWith(Array(classOf[ParameterizedTestExtension]))
 class NonDeterministicTest(isStreaming: Boolean) extends ExpressionTestBase(isStreaming) {
 
-  @Test
+  @TestTemplate
   def testTemporalFunctionsInStreamMode(): Unit = {
-    assumeTrue(isStreaming)
+    assumeThat(isStreaming).isTrue
     val temporalFunctions = getCodeGenFunctions(
       List(
         "CURRENT_DATE",
@@ -61,7 +61,7 @@
     Thread.sleep(1 * 1000L)
     val round2: List[String] = evaluateFunctionResult(temporalFunctions)
 
-    assertEquals(round1.size, round2.size)
+    assertThat(round2.size).isEqualTo(round2.size)
     round1.zip(round2).zipWithIndex.foreach {
       case ((result1: String, result2: String), index: Int) =>
         // CURRENT_DATE may be same between two records
@@ -76,14 +76,14 @@
     // should return same value for one record in stream job
     val currentTimeStampIndex = 2
     val currentRowTimestampIndex = 3
-    assertEquals(round1(currentTimeStampIndex), round1(currentRowTimestampIndex))
-    assertEquals(round2(currentTimeStampIndex), round2(currentRowTimestampIndex))
+    assertThat(round1(currentTimeStampIndex)).isEqualTo(round1(currentRowTimestampIndex))
+    assertThat(round2(currentTimeStampIndex)).isEqualTo(round2(currentRowTimestampIndex))
 
   }
 
-  @Test
+  @TestTemplate
   def testTemporalFunctionsInBatchMode(): Unit = {
-    assumeTrue(!isStreaming)
+    assumeThat(isStreaming).isFalse
     val zoneId = ZoneId.of("Asia/Shanghai")
     tableConfig.setLocalTimeZone(zoneId)
     tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
@@ -111,32 +111,31 @@
       "1970-01-01 08:00:01.123")
 
     val result = evaluateFunctionResult(temporalFunctions)
-    assertEquals(expected.toList.sorted, result.sorted)
-
+    assertThat(result.sorted).isEqualTo(expected.toList.sorted)
   }
 
-  @Test
+  @TestTemplate
   def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
-    assumeTrue(!isStreaming)
+    assumeThat(isStreaming).isFalse
     val temporalFunctions = getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
 
     val round1 = evaluateFunctionResult(temporalFunctions)
     Thread.sleep(1 * 1000L)
     val round2: List[String] = evaluateFunctionResult(temporalFunctions)
 
-    assertEquals(round1.size, round2.size)
+    assumeThat(round2.size).isEqualTo(round1.size)
     round1.zip(round2).foreach {
       case (result1: String, result2: String) =>
         assert(result1 < result2)
     }
   }
 
-  @Test
+  @TestTemplate
   def testTemporalFunctionsInUTC(): Unit = {
     testTemporalTimestamp(ZoneId.of("UTC"))
   }
 
-  @Test
+  @TestTemplate
   def testTemporalFunctionsInShanghai(): Unit = {
     testTemporalTimestamp(ZoneId.of("Asia/Shanghai"))
   }
@@ -193,12 +192,12 @@
       "TRUE")
   }
 
-  @Test
+  @TestTemplate
   def testUUID(): Unit = {
     testAllApis(uuid().charLength(), "CHARACTER_LENGTH(UUID())", "36")
   }
 
-  @Test
+  @TestTemplate
   def testRand(): Unit = {
     testSqlApi("RAND() <> RAND() or RAND() = RAND()", "TRUE")
     testSqlApi("RAND(1) <> RAND(1) or RAND(1) = RAND(1)", "TRUE")
@@ -243,7 +242,7 @@
 }
 
 object NonDeterministicTest {
-  @Parameterized.Parameters(name = "isStream={0}")
+  @Parameters(name = "isStream={0}")
   def parameters(): util.Collection[Boolean] = {
     util.Arrays.asList(true, false)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
index 07252e1..e179be8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
@@ -21,7 +21,8 @@
 import org.apache.flink.table.planner.expressions.utils.RowTypeTestBase
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => gLocalTime}
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class RowTypeTest extends RowTypeTestBase {
 
@@ -89,22 +90,24 @@
 
   @Test
   def testUnsupportedCastTableApi(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-
-    testTableApi(
-      'f5.cast(DataTypes.BIGINT()),
-      ""
-    )
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          testTableApi(
+            'f5.cast(DataTypes.BIGINT()),
+            ""
+          ))
   }
 
   @Test
   def testUnsupportedCastSqlApi(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage("Cast function cannot convert value")
-
-    testSqlApi(
-      "CAST(f5 AS BIGINT)",
-      ""
-    )
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          testSqlApi(
+            "CAST(f5 AS BIGINT)",
+            ""
+          ))
+      .withMessageContaining("Cast function cannot convert value")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 96a51a2..983b272 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -21,7 +21,7 @@
 import org.apache.flink.table.expressions.{Expression, TimeIntervalUnit, TimePointUnit}
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index c01cb8f..912b138 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -20,7 +20,7 @@
 import org.apache.flink.table.api.{DataTypes, LiteralStringExpression, UnresolvedFieldExpression}
 import org.apache.flink.table.planner.expressions.utils.ScalarOperatorsTestBase
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index b59df94..f45e0c9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -21,7 +21,7 @@
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.types.Row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 /**
  * Tests all SQL expressions that are currently supported according to the documentation. This tests
@@ -95,7 +95,7 @@
     testSqlApi("5-5", "0")
     testSqlApi("5*5", "25")
     testSqlApi("5/5", "1")
-    testSqlApi("5%2", "1");
+    testSqlApi("5%2", "1")
     testSqlApi("POWER(5, 5)", "3125.0")
     testSqlApi("POWER(-1, 0.5)", "NaN")
     testSqlApi("ABS(-5)", "5")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 5b5c5eb..6819dab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -26,7 +26,7 @@
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
-import org.junit.Test
+import org.junit.jupiter.api.Test
 
 import java.lang.{Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong}
 import java.sql.Timestamp
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/CompositeTypeTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/CompositeTypeTestBase.scala
index c3822c9..ecf52c5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/CompositeTypeTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/CompositeTypeTestBase.scala
@@ -84,7 +84,7 @@
     def getMyString: String = myString
 
     def setMyString(value: String): Unit = {
-      myString = myString
+      myString = value
     }
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index e52afd4..48e972d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -52,10 +52,10 @@
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.assertj.core.api.Assertions.assertThatThrownBy
-import org.junit.{After, Before, Rule}
-import org.junit.Assert.{assertEquals, assertTrue, fail}
-import org.junit.rules.ExpectedException
+import org.assertj.core.api.Assertions.{assertThatExceptionOfType, assertThatThrownBy}
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 
 import java.util.Collections
 
@@ -90,13 +90,7 @@
   private val tableName = "testTable"
   protected val nullable = "NULL"
 
-  // used for accurate exception information checking.
-  val expectedException: ExpectedException = ExpectedException.none()
-
-  @Rule
-  def thrown: ExpectedException = expectedException
-
-  @Before
+  @BeforeEach
   def prepare(): Unit = {
     settings = if (isStreaming) {
       EnvironmentSettings.newInstance().inStreamingMode().build()
@@ -138,7 +132,7 @@
     invalidTableApiExprs.clear()
   }
 
-  @After
+  @AfterEach
   def evaluateExprs(): Unit = {
 
     // evaluate valid expressions
@@ -146,25 +140,20 @@
 
     // evaluate invalid expressions
     invalidSqlExprs.foreach {
-      case (sqlExpr, keywords, clazz) => {
-        try {
+      case (sqlExpr, keywords, clazz) =>
+        val callable: ThrowingCallable = () => {
           val invalidExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
           addSqlTestExpr(sqlExpr, keywords, invalidExprs, clazz)
           evaluateGivenExprs(invalidExprs)
-          fail(s"Expected a $clazz, but no exception is thrown.")
-        } catch {
-          case e if e.getClass == clazz =>
-            if (keywords != null) {
-              assertTrue(
-                s"The actual exception message \n${e.getMessage}\n" +
-                  s"doesn't contain expected keyword \n$keywords\n",
-                e.getMessage.contains(keywords))
-            }
-          case e: Throwable =>
-            e.printStackTrace()
-            fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
         }
-      }
+        if (keywords != null) {
+          assertThatExceptionOfType(clazz)
+            .isThrownBy(callable)
+            .withMessageContaining(keywords)
+        } else {
+          assertThatExceptionOfType(clazz)
+            .isThrownBy(callable)
+        }
     }
 
     invalidTableApiExprs.foreach {
@@ -329,9 +318,9 @@
     val optimized = hep.findBestExp()
 
     // throw exception if plan contains more than a calc
-    if (!optimized.getInput(0).getInputs.isEmpty) {
-      fail("Expression is converted into more than a Calc operation. Use a different test method.")
-    }
+    assertTrue(
+      optimized.getInput(0).getInputs.isEmpty,
+      "Expression is converted into more than a Calc operation. Use a different test method.")
 
     exprs.asInstanceOf[mutable.ArrayBuffer[(String, RexNode, String)]] +=
       ((summaryString, extractRexNode(optimized), expected))
@@ -356,9 +345,9 @@
         case ((originalExpr, optimizedExpr, expected), actual) =>
           val original = if (originalExpr == null) "" else s"for: [$originalExpr]"
           assertEquals(
-            s"Wrong result $original optimized to: [$optimizedExpr]",
             expected,
-            if (actual == null) "NULL" else actual)
+            if (actual == null) "NULL" else actual,
+            s"Wrong result $original optimized to: [$optimizedExpr]")
       }
   }
 
@@ -401,7 +390,7 @@
   def testDataType: AbstractDataType[_] =
     throw new IllegalArgumentException("Implement this if no legacy types are expected.")
 
-  def testSystemFunctions: java.util.Map[String, ScalarFunction] = Collections.emptyMap();
+  def testSystemFunctions: java.util.Map[String, ScalarFunction] = Collections.emptyMap()
 
   // ----------------------------------------------------------------------------------------------
   // Legacy type system
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ArrayTypeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ArrayTypeValidationTest.scala
index aade9d9..ce47498 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ArrayTypeValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ArrayTypeValidationTest.scala
@@ -20,52 +20,62 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.ArrayTypeTestBase
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class ArrayTypeValidationTest extends ArrayTypeTestBase {
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testImplicitTypeCastArraySql(): Unit = {
-    testSqlApi("ARRAY['string', 12]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("ARRAY['string', 12]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testObviousInvalidIndexTableApi(): Unit = {
-    testTableApi('f2.at(0), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f2.at(0), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testEmptyArraySql(): Unit = {
-    testSqlApi("ARRAY[]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("ARRAY[]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testNullArraySql(): Unit = {
-    testSqlApi("ARRAY[NULL]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("ARRAY[NULL]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testDifferentTypesArraySql(): Unit = {
-    testSqlApi("ARRAY[1, TRUE]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("ARRAY[1, TRUE]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testElementNonArray(): Unit = {
-    testTableApi('f0.element(), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f0.element(), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testElementNonArraySql(): Unit = {
-    testSqlApi("ELEMENT(f0)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("ELEMENT(f0)", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testCardinalityOnNonArray(): Unit = {
-    testTableApi('f0.cardinality(), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f0.cardinality(), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testCardinalityOnNonArraySql(): Unit = {
-    testSqlApi("CARDINALITY(f0)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("CARDINALITY(f0)", "FAIL"))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/CompositeAccessValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/CompositeAccessValidationTest.scala
index ed557be..3e1627d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/CompositeAccessValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/CompositeAccessValidationTest.scala
@@ -20,27 +20,33 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.CompositeTypeTestBase
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class CompositeAccessValidationTest extends CompositeTypeTestBase {
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWrongSqlFieldFull(): Unit = {
-    testSqlApi("testTable.f5.test", "13")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("testTable.f5.test", "13"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWrongSqlField(): Unit = {
-    testSqlApi("f5.test", "13")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("f5.test", "13"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWrongIntKeyField(): Unit = {
-    testTableApi('f0.get(555), "fail")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f0.get(555), "fail"))
+
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWrongStringKeyField(): Unit = {
-    testTableApi('f0.get("fghj"), "fail")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f0.get("fghj"), "fail"))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/MapTypeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/MapTypeValidationTest.scala
index 6375c8b..e1132f9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/MapTypeValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/MapTypeValidationTest.scala
@@ -20,34 +20,44 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.MapTypeTestBase
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class MapTypeValidationTest extends MapTypeTestBase {
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testWrongKeyType(): Unit = {
-    testAllApis('f2.at(12), "f2[12]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis('f2.at(12), "f2[12]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testIncorrectMapTypeComparison(): Unit = {
-    testAllApis('f1 === 'f3, "f1 = f3", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis('f1 === 'f3, "f1 = f3", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testUnsupportedComparisonType(): Unit = {
-    testAllApis('f6 !== 'f2, "f6 != f2", "FAIL")
-    testSqlApi("f6 <> f2", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis('f6 !== 'f2, "f6 != f2", "FAIL"))
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("f6 <> f2", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testEmptyMap(): Unit = {
-    testAllApis("FAIL", "MAP[]", "FAIL")
-    testSqlApi("MAP[]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis("FAIL", "MAP[]", "FAIL"))
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("MAP[]", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testUnsupportedMapImplicitTypeCastSql(): Unit = {
-    testSqlApi("MAP['k1', 'string', 'k2', 12]", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("MAP['k1', 'string', 'k2', 12]", "FAIL"))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/RowTypeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/RowTypeValidationTest.scala
index a586569..4ce27d1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/RowTypeValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/RowTypeValidationTest.scala
@@ -20,22 +20,26 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.RowTypeTestBase
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class RowTypeValidationTest extends RowTypeTestBase {
 
-  @Test(expected = classOf[SqlParserException])
+  @Test
   def testEmptyRowType(): Unit = {
-    testSqlApi("Row()", "FAIL")
+    assertThatExceptionOfType(classOf[SqlParserException])
+      .isThrownBy(() => testSqlApi("Row()", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testNullRowType(): Unit = {
-    testAllApis("FAIL", "Row(NULL)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis("FAIL", "Row(NULL)", "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testSqlRowIllegalAccess(): Unit = {
-    testAllApis('f5.get("f2"), "f5.f2", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testAllApis('f5.get("f2"), "f5.f2", "FAIL"))
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index c4b0b0f..1ce90d8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -21,8 +21,8 @@
 import org.apache.flink.table.expressions.TimePointUnit
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
 
-import org.apache.calcite.avatica.util.TimeUnit
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
 
@@ -30,104 +30,126 @@
   // Math functions
   // ----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidBin1(): Unit = {
-    testSqlApi("BIN(f12)", "101010") // float type
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("BIN(f12)", "101010")) // float type
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidBin2(): Unit = {
-    testSqlApi("BIN(f15)", "101010") // BigDecimal type
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("BIN(f15)", "101010")) // BigDecimal type
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidBin3(): Unit = {
-    testSqlApi("BIN(f16)", "101010") // Date type
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("BIN(f16)", "101010")) // Date type
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidTruncate1(): Unit = {
     // All arguments are string type
-    testSqlApi("TRUNCATE('abc', 'def')", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE('abc', 'def')", "FAIL"))
 
     // The second argument is of type String
-    testSqlApi("TRUNCATE(f12, f0)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE(f12, f0)", "FAIL"))
 
     // The second argument is of type Float
-    testSqlApi("TRUNCATE(f12,f12)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE(f12,f12)", "FAIL"))
 
     // The second argument is of type Double
-    testSqlApi("TRUNCATE(f12, cast(f28 as DOUBLE))", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE(f12, cast(f28 as DOUBLE))", "FAIL"))
 
     // The second argument is of type BigDecimal
-    testSqlApi("TRUNCATE(f12,f15)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE(f12,f15)", "FAIL"))
   }
 
   @Test
   def testInvalidTruncate2(): Unit = {
-    thrown.expect(classOf[ValidationException])
     // The one argument is of type String
-    testSqlApi("TRUNCATE('abc')", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TRUNCATE('abc')", "FAIL"))
   }
 
   // ----------------------------------------------------------------------------------------------
   // String functions
   // ----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidSubstring1(): Unit = {
     // Must fail. Parameter of substring must be an Integer not a Double.
-    testTableApi("test".substring(2.0.toExpr), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi("test".substring(2.0.toExpr), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidSubstring2(): Unit = {
     // Must fail. Parameter of substring must be an Integer not a String.
-    testTableApi("test".substring("test".toExpr), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi("test".substring("test".toExpr), "FAIL"))
   }
 
   // ----------------------------------------------------------------------------------------------
   // Temporal functions
   // ----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[SqlParserException])
+  @Test
   def testTimestampAddWithWrongTimestampInterval(): Unit = {
-    testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24'))", "2016-06-16")
+    assertThatExceptionOfType(classOf[SqlParserException])
+      .isThrownBy(() => testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24'))", "2016-06-16"))
   }
 
-  @Test(expected = classOf[SqlParserException])
+  @Test
   def testTimestampAddWithWrongTimestampFormat(): Unit = {
-    testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16")
+    assertThatExceptionOfType(classOf[SqlParserException])
+      .isThrownBy(() => testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testTimestampAddWithWrongQuantity(): Unit = {
-    testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16"))
   }
 
   // ----------------------------------------------------------------------------------------------
   // Sub-query functions
   // ----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInValidationExceptionMoreThanOneTypes(): Unit = {
-    testTableApi('f2.in('f3, 'f8), "TRUE")
-    testTableApi('f2.in('f3, 'f4, 4), "FALSE")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f2.in('f3, 'f8), "TRUE"))
+
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f2.in('f3, 'f4, 4), "FALSE"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def scalaInValidationExceptionDifferentOperandsTest(): Unit = {
-    testTableApi('f1.in("Hi", "Hello world", "Comment#1"), "TRUE")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f1.in("Hi", "Hello world", "Comment#1"), "TRUE"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testTimestampDiffWithWrongTime(): Unit = {
-    testTableApi(timestampDiff(TimePointUnit.DAY, "2016-02-24", "2016-02-27"), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => testTableApi(timestampDiff(TimePointUnit.DAY, "2016-02-24", "2016-02-27"), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testTimestampDiffWithWrongTimeAndUnit(): Unit = {
-    testTableApi(timestampDiff(TimePointUnit.MINUTE, "2016-02-24", "2016-02-27"), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () => testTableApi(timestampDiff(TimePointUnit.MINUTE, "2016-02-24", "2016-02-27"), "FAIL"))
   }
 
   // ----------------------------------------------------------------------------------------------
@@ -137,27 +159,29 @@
   @Test
   def testInvalidStringToMap(): Unit = {
     // test non-exist key access
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Invalid number of arguments to function 'STR_TO_MAP'")
-    testSqlApi(
-      "STR_TO_MAP('k1:v1;k2:v2', ';')",
-      "EXCEPTION"
-    )
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(
+        () =>
+          testSqlApi(
+            "STR_TO_MAP('k1:v1;k2:v2', ';')",
+            "EXCEPTION"
+          ))
+      .withMessageContaining("Invalid number of arguments to function 'STR_TO_MAP'")
   }
 
   @Test
   def testInvalidIf(): Unit = {
     // test IF(BOOL, STRING, BOOLEAN)
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Cannot apply 'IF' to arguments")
-    testSqlApi("IF(f7 > 5, f0, f1)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("IF(f7 > 5, f0, f1)", "FAIL"))
+      .withMessageContaining("Cannot apply 'IF' to arguments")
   }
 
   @Test
   def testInvalidToBase64(): Unit = {
     // test TO_BASE64(INTEGER)
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Cannot apply 'TO_BASE64' to arguments of type 'TO_BASE64(<INTEGER>)'")
-    testSqlApi("TO_BASE64(11)", "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testSqlApi("TO_BASE64(11)", "FAIL"))
+      .withMessageContaining("Cannot apply 'TO_BASE64' to arguments of type 'TO_BASE64(<INTEGER>)'")
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
index ee0ac6d..f319924 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -20,42 +20,49 @@
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.ScalarOperatorsTestBase
 
-import org.junit.Test
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.junit.jupiter.api.Test
 
 class ScalarOperatorsValidationTest extends ScalarOperatorsTestBase {
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testIfInvalidTypesScala(): Unit = {
-    testTableApi(('f6 && true).?(5, "false"), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi(('f6 && true).?(5, "false"), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidStringComparison1(): Unit = {
-    testTableApi("w" === 4, "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi("w" === 4, "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInvalidStringComparison2(): Unit = {
-    testTableApi("w" > 4.toExpr, "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi("w" > 4.toExpr, "FAIL"))
   }
 
   // ----------------------------------------------------------------------------------------------
   // Sub-query functions
   // ----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInMoreThanOneTypes(): Unit = {
-    testTableApi('f2.in('f3, 'f4, 4), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f2.in('f3, 'f4, 4), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testInDifferentOperands(): Unit = {
-    testTableApi('f1.in("Hi", "Hello world", "Comment#1"), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi('f1.in("Hi", "Hello world", "Comment#1"), "FAIL"))
   }
 
-  @Test(expected = classOf[ValidationException])
+  @Test
   def testBetweenWithDifferentOperandTypeScala(): Unit = {
-    testTableApi(2.between(1, "a"), "FAIL")
+    assertThatExceptionOfType(classOf[ValidationException])
+      .isThrownBy(() => testTableApi(2.between(1, "a"), "FAIL"))
   }
 
   @Test