[BEAM-10653] Modularize BeamSqlDslUdfUdafTest.

Split tests with multiple, independent branches into separate test
cases.

The only part removed was in testUdaf. The two cases in testUdaf were
different in the past, but converged at some point.
https://github.com/apache/beam/blob/d8ff78b65bbe7e3a2239249f034a538ca65b0706/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java#L52
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index c2afc5d..352a3cb 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -55,73 +55,77 @@
 
     Row row = Row.withSchema(resultType).addValues(0, 30).build();
 
-    String sql1 =
-        "SELECT f_int2, squaresum1(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
+    String sql = "SELECT f_int2, squaresum(f_int) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testUdaf1", SqlTransform.query(sql1).registerUdaf("squaresum1", new SquareSum()));
-    PAssert.that(result1).containsInAnyOrder(row);
-
-    String sql2 =
-        "SELECT f_int2, squaresum2(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result2 =
-        PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
-            .apply(
-                "testUdaf2", SqlTransform.query(sql2).registerUdaf("squaresum2", new SquareSum()));
-    PAssert.that(result2).containsInAnyOrder(row);
+            "testUdaf", SqlTransform.query(sql).registerUdaf("squaresum", new SquareSum()));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
 
-  /** Test Joda time UDF/UDAF. */
+  /** Test Joda time UDF. */
   @Test
-  public void testJodaTimeUdfUdaf() throws Exception {
+  public void testJodaTimeUdf() throws Exception {
     Schema resultType = Schema.builder().addDateTimeField("jodatime").build();
 
-    Row row1 =
+    Row row =
         Row.withSchema(resultType)
             .addValues(parseTimestampWithoutTimeZone("2017-01-01 02:04:03"))
             .build();
 
-    String sql1 = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION";
-    PCollection<Row> result1 =
+    String sql = "SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testJodaUdaf", SqlTransform.query(sql1).registerUdaf("MAX_JODA", new JodaMax()));
-    PAssert.that(result1).containsInAnyOrder(row1);
+            "testJodaUdaf", SqlTransform.query(sql).registerUdaf("MAX_JODA", new JodaMax()));
+    PAssert.that(result).containsInAnyOrder(row);
 
-    Row row2 =
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Test Joda time UDAF. */
+  @Test
+  public void testJodaTimeUdaf() throws Exception {
+    Schema resultType = Schema.builder().addDateTimeField("jodatime").build();
+
+    Row row =
         Row.withSchema(resultType)
             .addValues(parseTimestampWithoutTimeZone("2016-12-31 01:01:03"))
             .build();
 
-    String sql2 = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1";
-    PCollection<Row> result2 =
+    String sql = "SELECT PRE_DAY(f_timestamp) as jodatime FROM PCOLLECTION WHERE f_int=1";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testTimeUdf", SqlTransform.query(sql2).registerUdf("PRE_DAY", PreviousDay.class));
-    PAssert.that(result2).containsInAnyOrder(row2);
+            "testTimeUdf", SqlTransform.query(sql).registerUdf("PRE_DAY", PreviousDay.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
 
   @Test
-  public void testListUdf() throws Exception {
-    Schema resultType1 = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
-    Row row1 = Row.withSchema(resultType1).addValue(Arrays.asList(1L)).build();
-    String sql1 = "SELECT test_array(1)";
-    PCollection<Row> result1 =
+  public void testUdfWithListOutput() throws Exception {
+    Schema resultType = Schema.builder().addArrayField("array_field", FieldType.INT64).build();
+    Row row = Row.withSchema(resultType).addValue(Arrays.asList(1L)).build();
+    String sql = "SELECT test_array(1)";
+    PCollection<Row> result =
         boundedInput1.apply(
             "testArrayUdf",
-            SqlTransform.query(sql1).registerUdf("test_array", TestReturnTypeList.class));
-    PAssert.that(result1).containsInAnyOrder(row1);
+            SqlTransform.query(sql).registerUdf("test_array", TestReturnTypeList.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
-    Schema resultType2 = Schema.builder().addInt32Field("int_field").build();
-    Row row2 = Row.withSchema(resultType2).addValue(3).build();
-    String sql2 = "select array_length(ARRAY[1, 2, 3])";
-    PCollection<Row> result2 =
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testUdfWithListInput() throws Exception {
+    Schema resultType = Schema.builder().addInt32Field("int_field").build();
+    Row row = Row.withSchema(resultType).addValue(3).build();
+    String sql = "select array_length(ARRAY[1, 2, 3])";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testArrayUdf2",
-            SqlTransform.query(sql2).registerUdf("array_length", TestListLength.class));
-    PAssert.that(result2).containsInAnyOrder(row2);
+            "testArrayUdf",
+            SqlTransform.query(sql).registerUdf("array_length", TestListLength.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
@@ -133,14 +137,14 @@
 
     Row row = Row.withSchema(resultType).addValues(0, 354).build();
 
-    String sql1 =
+    String sql =
         "SELECT f_int2, double_square_sum(f_int) AS `squaresum`"
             + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
+    PCollection<Row> result =
         boundedInput1.apply(
             "testUdaf",
-            SqlTransform.query(sql1).registerUdaf("double_square_sum", new SquareSquareSum()));
-    PAssert.that(result1).containsInAnyOrder(row);
+            SqlTransform.query(sql).registerUdaf("double_square_sum", new SquareSquareSum()));
+    PAssert.that(result).containsInAnyOrder(row);
 
     pipeline.run().waitUntilFinish();
   }
@@ -155,46 +159,55 @@
     exceptions.expectCause(hasMessage(containsString("CombineFn must be parameterized")));
     pipeline.enableAbandonedNodeEnforcement(false);
 
-    Schema resultType = Schema.builder().addInt32Field("f_int2").addInt32Field("squaresum").build();
-
-    Row row = Row.withSchema(resultType).addValues(0, 354).build();
-
-    String sql1 =
+    String sql =
         "SELECT f_int2, squaresum(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2";
-    PCollection<Row> result1 =
-        boundedInput1.apply(
-            "testUdaf", SqlTransform.query(sql1).registerUdaf("squaresum", new RawCombineFn()));
+    boundedInput1.apply(
+        "testUdaf", SqlTransform.query(sql).registerUdaf("squaresum", new RawCombineFn()));
   }
 
-  /** test UDF. */
+  /** Test UDF implementing {@link BeamSqlUdf}. */
   @Test
-  public void testUdf() throws Exception {
+  public void testBeamSqlUdf() throws Exception {
     Schema resultType = Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build();
     Row row = Row.withSchema(resultType).addValues(2, 8).build();
 
-    String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result1 =
+    String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
         boundedInput1.apply(
-            "testUdf1", SqlTransform.query(sql1).registerUdf("cubic1", CubicInteger.class));
-    PAssert.that(result1).containsInAnyOrder(row);
+            "testUdf", SqlTransform.query(sql).registerUdf("cubic", CubicInteger.class));
+    PAssert.that(result).containsInAnyOrder(row);
 
-    String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result2 =
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Test UDF implementing {@link SerializableFunction}. */
+  @Test
+  public void testSerializableFunctionUdf() throws Exception {
+    Schema resultType = Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build();
+    Row row = Row.withSchema(resultType).addValues(2, 8).build();
+
+    String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
+        PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
+            .apply("testUdf", SqlTransform.query(sql).registerUdf("cubic", new CubicIntegerFn()));
+    PAssert.that(result).containsInAnyOrder(row);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Test UDF implementing {@link BeamSqlUdf} with default parameters. */
+  @Test
+  public void testBeamSqlUdfWithDefaultParameters() throws Exception {
+    String sql = "SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result =
         PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
             .apply(
-                "testUdf2", SqlTransform.query(sql2).registerUdf("cubic2", new CubicIntegerFn()));
-    PAssert.that(result2).containsInAnyOrder(row);
-
-    String sql3 = "SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2";
-    PCollection<Row> result3 =
-        PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
-            .apply(
-                "testUdf3", SqlTransform.query(sql3).registerUdf("substr", UdfFnWithDefault.class));
+                "testUdf", SqlTransform.query(sql).registerUdf("substr", UdfFnWithDefault.class));
 
     Schema subStrSchema =
         Schema.builder().addInt32Field("f_int").addStringField("sub_string").build();
     Row subStrRow = Row.withSchema(subStrSchema).addValues(2, "s").build();
-    PAssert.that(result3).containsInAnyOrder(subStrRow);
+    PAssert.that(result).containsInAnyOrder(subStrRow);
 
     pipeline.run().waitUntilFinish();
   }
@@ -204,12 +217,12 @@
    */
   @Test
   public void testTableMacroUdf() throws Exception {
-    String sql1 = "SELECT * FROM table(range_udf(0, 3))";
+    String sql = "SELECT * FROM table(range_udf(0, 3))";
 
     Schema schema = Schema.of(Schema.Field.of("f0", Schema.FieldType.INT32));
 
     PCollection<Row> rows =
-        pipeline.apply(SqlTransform.query(sql1).registerUdf("range_udf", RangeUdf.class));
+        pipeline.apply(SqlTransform.query(sql).registerUdf("range_udf", RangeUdf.class));
 
     PAssert.that(rows)
         .containsInAnyOrder(