Merge pull request #14767 from [BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage 

[BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
diff --git a/CHANGES.md b/CHANGES.md
index c35b778..90d0123 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -119,24 +119,20 @@
 
 * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
-# [2.29.0] - Release branch cut, any updates should be cherry-picked
+# [2.29.0] - 2021-04-29
 
 ## Highlights
 
-* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
 * Spark Classic and Portable runners officially support Spark 3 ([BEAM-7093](https://issues.apache.org/jira/browse/BEAM-7093)).
 * Official Java 11 support for most runners (Dataflow, Flink, Spark) ([BEAM-2530](https://issues.apache.org/jira/browse/BEAM-2530)).
 * DataFrame API now supports GroupBy.apply ([BEAM-11628](https://issues.apache.org/jira/browse/BEAM-11628)).
 
 ## I/Os
 
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Added support for S3 filesystem on AWS SDK V2 (Java) ([BEAM-7637](https://issues.apache.org/jira/browse/BEAM-7637))
 
 ## New Features / Improvements
 
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * DataFrame API now supports pandas 1.2.x ([BEAM-11531](https://issues.apache.org/jira/browse/BEAM-11531)).
 * Multiple DataFrame API bugfixes ([BEAM-12071](https://issues.apache/jira/browse/BEAM-12071), [BEAM-11929](https://issues.apache/jira/browse/BEAM-11929))
 
@@ -146,17 +142,11 @@
   To restore the old behavior, one can register `FakeDeterministicFastPrimitivesCoder` with
   `beam.coders.registry.register_fallback_coder(beam.coders.coders.FakeDeterministicFastPrimitivesCoder())`
   or use the `allow_non_deterministic_key_coders` pipeline option.
-* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## Deprecations
 
-* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Support for Flink 1.8 and 1.9 will be removed in the next release (2.30.0) ([BEAM-11948](https://issues.apache.org/jira/browse/BEAM-11948)).
 
-## Known Issues
-
-* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-
 
 # [2.28.0] - 2021-02-22
 
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index f6d61b5..d2e2898 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -417,7 +417,6 @@
     // a dependency version which should match across multiple
     // Maven artifacts.
     def activemq_version = "5.14.5"
-    def antlr_version = "4.9.2"
     def autovalue_version = "1.8.1"
     def aws_java_sdk_version = "1.11.974"
     def aws_java_sdk2_version = "2.15.31"
@@ -467,8 +466,8 @@
         activemq_junit                              : "org.apache.activemq.tooling:activemq-junit:$activemq_version",
         activemq_kahadb_store                       : "org.apache.activemq:activemq-kahadb-store:$activemq_version",
         activemq_mqtt                               : "org.apache.activemq:activemq-mqtt:$activemq_version",
-        antlr                                       : "org.antlr:antlr4:$antlr_version",
-        antlr_runtime                               : "org.antlr:antlr4-runtime:$antlr_version",
+        antlr                                       : "org.antlr:antlr4:4.7",
+        antlr_runtime                               : "org.antlr:antlr4-runtime:4.7",
         args4j                                      : "args4j:args4j:2.33",
         auto_value_annotations                      : "com.google.auto.value:auto-value-annotations:$autovalue_version",
         avro                                        : "org.apache.avro:avro:1.8.2",
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index ff320f7..8748ddc 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -13,6 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Package xlangx contains various low-level utilities needed for adding
+// cross-language transforms to the pipeline.
 package xlangx
 
 import (
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
index c038463..cdb3457 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
@@ -19,10 +19,10 @@
 	"bytes"
 	"reflect"
 
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/schema"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-	"github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"google.golang.org/protobuf/proto"
 )
 
@@ -53,7 +53,7 @@
 	}
 
 	// Put schema and row into payload proto, and marshal it.
-	ecp := &pipeline_v1.ExternalConfigurationPayload{
+	ecp := &pipepb.ExternalConfigurationPayload{
 		Schema:  scm,
 		Payload: buf.Bytes(),
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 1ba1ac4..b434650 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -100,7 +100,7 @@
 
 				var resolvedDeps []*pipepb.ArtifactInformation
 				for _, a := range resolvedArtifacts {
-					name, _ := artifact.MustExtractFilePayload(a)
+					name, sha256 := artifact.MustExtractFilePayload(a)
 					fullTmpPath := filepath.Join(tmpPath, "/", name)
 					fullSdkPath := fullTmpPath
 					if len(cfg.SdkPath) > 0 {
@@ -111,7 +111,8 @@
 							TypeUrn: "beam:artifact:type:file:v1",
 							TypePayload: protox.MustEncode(
 								&pipepb.ArtifactFilePayload{
-									Path: fullSdkPath,
+									Path:   fullSdkPath,
+									Sha256: sha256,
 								},
 							),
 							RoleUrn:     a.RoleUrn,
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
index 9146e05..70d7965 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
@@ -20,6 +20,7 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -113,8 +114,7 @@
       return add(type, name, false);
     }
 
-    public ParameterListBuilder add(
-        final Class<?> type, final String name, final boolean optional) {
+    public ParameterListBuilder add(final Type type, final String name, final boolean optional) {
       final int ordinal = builder.size();
       builder.add(
           new FunctionParameter() {
@@ -142,7 +142,7 @@
     }
 
     public ParameterListBuilder addMethodParameters(Method method) {
-      final Class<?>[] types = method.getParameterTypes();
+      final Type[] types = method.getGenericParameterTypes();
       for (int i = 0; i < types.length; i++) {
         add(
             types[i],
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index 9299473..5ad634fb 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -142,7 +142,20 @@
         sqlScalarUdfs.put(createFunctionStmt.getNamePath(), createFunctionStmt);
         break;
       case USER_DEFINED_JAVA_SCALAR_FUNCTIONS:
-        validateJavaUdf(createFunctionStmt);
+        String functionName = String.join(".", createFunctionStmt.getNamePath());
+        for (FunctionArgumentType argumentType :
+            createFunctionStmt.getSignature().getFunctionArgumentList()) {
+          Type type = argumentType.getType();
+          if (type == null) {
+            throw new UnsupportedOperationException(
+                "UDF templated argument types are not supported.");
+          }
+          validateJavaUdfZetaSqlType(type, functionName);
+        }
+        if (createFunctionStmt.getReturnType() == null) {
+          throw new IllegalArgumentException("UDF return type must not be null.");
+        }
+        validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType(), functionName);
         String jarPath = getJarPath(createFunctionStmt);
         ScalarFn scalarFn =
             javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath);
@@ -174,29 +187,14 @@
             ImmutableList.of(createFunctionStmt.getSignature())));
   }
 
-  void validateJavaUdf(ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt) {
-    for (FunctionArgumentType argumentType :
-        createFunctionStmt.getSignature().getFunctionArgumentList()) {
-      Type type = argumentType.getType();
-      if (type == null) {
-        throw new UnsupportedOperationException("UDF templated argument types are not supported.");
-      }
-      validateJavaUdfZetaSqlType(type);
-    }
-    if (createFunctionStmt.getReturnType() == null) {
-      throw new IllegalArgumentException("UDF return type must not be null.");
-    }
-    validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType());
-  }
-
   /**
    * Throws {@link UnsupportedOperationException} if ZetaSQL type is not supported in Java UDF.
    * Supported types are a subset of the types supported by {@link BeamJavaUdfCalcRule}.
    *
-   * <p>Supported types should be kept in sync with {@link
-   * #validateJavaUdfCalciteType(RelDataType)}.
+   * <p>Supported types should be kept in sync with {@link #validateJavaUdfCalciteType(RelDataType,
+   * String)}.
    */
-  void validateJavaUdfZetaSqlType(Type type) {
+  void validateJavaUdfZetaSqlType(Type type, String functionName) {
     switch (type.getKind()) {
       case TYPE_BOOL:
       case TYPE_BYTES:
@@ -204,17 +202,20 @@
       case TYPE_DOUBLE:
       case TYPE_INT64:
       case TYPE_STRING:
+      case TYPE_TIMESTAMP:
         // These types are supported.
         break;
+      case TYPE_ARRAY:
+        validateJavaUdfZetaSqlType(type.asArray().getElementType(), functionName);
+        break;
       case TYPE_NUMERIC:
       case TYPE_TIME:
       case TYPE_DATETIME:
-      case TYPE_TIMESTAMP:
-      case TYPE_ARRAY:
       case TYPE_STRUCT:
       default:
         throw new UnsupportedOperationException(
-            "ZetaSQL type not allowed in Java UDF: " + type.getKind().name());
+            String.format(
+                "ZetaSQL type %s not allowed in function %s", type.getKind().name(), functionName));
     }
   }
 
@@ -365,7 +366,10 @@
           ScalarFunctionImpl scalarFunction = (ScalarFunctionImpl) function;
           // Validate types before converting from Calcite to ZetaSQL, since the conversion may fail
           // for unsupported types.
-          validateScalarFunctionImpl(scalarFunction);
+          for (FunctionParameter parameter : scalarFunction.getParameters()) {
+            validateJavaUdfCalciteType(parameter.getType(typeFactory), functionName);
+          }
+          validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory), functionName);
           Method method = scalarFunction.method;
           javaScalarUdfs.put(path, UserFunctionDefinitions.JavaScalarFunction.create(method, ""));
           FunctionArgumentType resultType =
@@ -414,39 +418,37 @@
         .collect(Collectors.toList());
   }
 
-  private void validateScalarFunctionImpl(ScalarFunctionImpl scalarFunction) {
-    for (FunctionParameter parameter : scalarFunction.getParameters()) {
-      validateJavaUdfCalciteType(parameter.getType(typeFactory));
-    }
-    validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory));
-  }
-
   /**
    * Throws {@link UnsupportedOperationException} if Calcite type is not supported in Java UDF.
    * Supported types are a subset of the corresponding Calcite types supported by {@link
    * BeamJavaUdfCalcRule}.
    *
-   * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type)}.
+   * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type,
+   * String)}.
    */
-  private void validateJavaUdfCalciteType(RelDataType type) {
+  private void validateJavaUdfCalciteType(RelDataType type, String functionName) {
     switch (type.getSqlTypeName()) {
       case BIGINT:
+      case BOOLEAN:
       case DATE:
       case DOUBLE:
-      case BOOLEAN:
+      case TIMESTAMP:
       case VARCHAR:
       case VARBINARY:
         // These types are supported.
         break;
+      case ARRAY:
+        validateJavaUdfCalciteType(type.getComponentType(), functionName);
+        break;
       case DECIMAL:
       case TIME:
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-      case TIMESTAMP:
-      case ARRAY:
       case ROW:
       default:
         throw new UnsupportedOperationException(
-            "Calcite type not allowed in ZetaSQL Java UDF: " + type.getSqlTypeName().getName());
+            String.format(
+                "Calcite type %s not allowed in function %s",
+                type.getSqlTypeName().getName(), functionName));
     }
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
index 2196046..94c984c 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
@@ -25,6 +25,7 @@
 import com.google.zetasql.AnalyzerOptions;
 import com.google.zetasql.resolvedast.ResolvedNodes;
 import java.lang.reflect.Method;
+import java.sql.Time;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
@@ -53,14 +54,14 @@
     }
   }
 
-  public static class ReturnsArrayFn implements BeamSqlUdf {
-    public List<Long> eval() {
-      return ImmutableList.of(1L, 2L, 3L);
+  public static class ReturnsArrayTimeFn implements BeamSqlUdf {
+    public List<Time> eval() {
+      return ImmutableList.of(new Time(0));
     }
   }
 
-  public static class TakesArrayFn implements BeamSqlUdf {
-    public Long eval(List<Long> ls) {
+  public static class TakesArrayTimeFn implements BeamSqlUdf {
+    public Long eval(List<Time> ls) {
       return 0L;
     }
   }
@@ -92,10 +93,10 @@
   public void rejectsScalarFunctionImplWithUnsupportedReturnType() throws NoSuchMethodException {
     JdbcConnection jdbcConnection = createJdbcConnection();
     SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
-    Method method = ReturnsArrayFn.class.getMethod("eval");
+    Method method = ReturnsArrayTimeFn.class.getMethod("eval");
     calciteSchema.add("return_array", ScalarFunctionImpl.create(method));
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+    thrown.expectMessage("Calcite type TIME not allowed in function return_array");
     BeamZetaSqlCatalog beamCatalog =
         BeamZetaSqlCatalog.create(
             calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -105,10 +106,10 @@
   public void rejectsScalarFunctionImplWithUnsupportedParameterType() throws NoSuchMethodException {
     JdbcConnection jdbcConnection = createJdbcConnection();
     SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
-    Method method = TakesArrayFn.class.getMethod("eval", List.class);
+    Method method = TakesArrayTimeFn.class.getMethod("eval", List.class);
     calciteSchema.add("take_array", ScalarFunctionImpl.create(method));
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+    thrown.expectMessage("Calcite type TIME not allowed in function take_array");
     BeamZetaSqlCatalog beamCatalog =
         BeamZetaSqlCatalog.create(
             calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -125,14 +126,14 @@
             analyzerOptions);
 
     String sql =
-        "CREATE FUNCTION foo() RETURNS ARRAY<INT64> LANGUAGE java OPTIONS (path='/does/not/exist');";
+        "CREATE FUNCTION foo() RETURNS ARRAY<TIME> LANGUAGE java OPTIONS (path='/does/not/exist');";
     ResolvedNodes.ResolvedStatement resolvedStatement =
         Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
     ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
         (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
 
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
     beamCatalog.addFunction(createFunctionStmt);
   }
 
@@ -147,14 +148,14 @@
             analyzerOptions);
 
     String sql =
-        "CREATE FUNCTION foo(a ARRAY<INT64>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
+        "CREATE FUNCTION foo(a ARRAY<TIME>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
     ResolvedNodes.ResolvedStatement resolvedStatement =
         Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
     ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
         (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
 
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
     beamCatalog.addFunction(createFunctionStmt);
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
index 98fb25f..fda9ef8 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
@@ -18,7 +18,9 @@
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
 import java.sql.Date;
+import java.sql.Timestamp;
 import java.time.LocalDate;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -36,7 +38,10 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
@@ -77,6 +82,8 @@
                   .addDoubleField("float64_neg_inf")
                   .addDoubleField("float64_nan")
                   .addLogicalTypeField("f_date", SqlTypes.DATE)
+                  .addDateTimeField("f_timestamp")
+                  .addArrayField("array_int64", Schema.FieldType.INT64)
                   .build())
           .addRows(
               true /* boolean_true */,
@@ -101,7 +108,9 @@
               Double.POSITIVE_INFINITY /* float64_inf */,
               Double.NEGATIVE_INFINITY /* float64_neg_inf */,
               Double.NaN /* float64_nan */,
-              LocalDate.of(2021, 4, 26) /* f_date */);
+              LocalDate.of(2021, 4, 26) /* f_date */,
+              new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC) /* f_timestamp */,
+              ImmutableList.of(1L, 2L, 3L) /* array_int64 */);
 
   @Before
   public void setUp() throws NoSuchMethodException {
@@ -132,6 +141,12 @@
         ScalarFunctionImpl.create(DoubleIdentityFn.class.getMethod("eval", Double.class)));
     schema.add(
         "test_date", ScalarFunctionImpl.create(DateIdentityFn.class.getMethod("eval", Date.class)));
+    schema.add(
+        "test_timestamp",
+        ScalarFunctionImpl.create(TimestampIdentityFn.class.getMethod("eval", Timestamp.class)));
+    schema.add(
+        "test_array",
+        ScalarFunctionImpl.create(ListIdentityFn.class.getMethod("eval", List.class)));
 
     this.config = Frameworks.newConfigBuilder(config).defaultSchema(schema).build();
   }
@@ -172,6 +187,18 @@
     }
   }
 
+  public static class TimestampIdentityFn implements BeamSqlUdf {
+    public Timestamp eval(Timestamp input) {
+      return input;
+    }
+  }
+
+  public static class ListIdentityFn implements BeamSqlUdf {
+    public List<Long> eval(List<Long> input) {
+      return input;
+    }
+  }
+
   private void runUdfTypeTest(String query, Object result, Schema.TypeName typeName) {
     runUdfTypeTest(query, result, Schema.FieldType.of(typeName));
   }
@@ -452,8 +479,7 @@
 
   @Test
   public void testDateLiteral() {
-    runUdfTypeTest(
-        "SELECT test_date('2021-04-26') FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
+    runUdfTypeTest("SELECT test_date('2021-04-26');", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
   }
 
   @Test
@@ -461,4 +487,36 @@
     runUdfTypeTest(
         "SELECT test_date(f_date) FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
   }
+
+  @Test
+  public void testTimestampLiteral() {
+    runUdfTypeTest(
+        "SELECT test_timestamp('2021-05-06 03:48:32Z');",
+        new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC),
+        Schema.TypeName.DATETIME);
+  }
+
+  @Test
+  public void testTimestampInput() {
+    runUdfTypeTest(
+        "SELECT test_timestamp(f_timestamp) FROM table;",
+        new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC),
+        Schema.TypeName.DATETIME);
+  }
+
+  @Test
+  public void testArrayLiteral() {
+    runUdfTypeTest(
+        "SELECT test_array(ARRAY<INT64>[1, 2, 3]);",
+        ImmutableList.of(1L, 2L, 3L),
+        Schema.FieldType.array(Schema.FieldType.INT64));
+  }
+
+  @Test
+  public void testArrayInput() {
+    runUdfTypeTest(
+        "SELECT test_array(array_int64) FROM table;",
+        ImmutableList.of(1L, 2L, 3L),
+        Schema.FieldType.array(Schema.FieldType.INT64));
+  }
 }
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 76fa14c..fd14ab4 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -208,7 +208,7 @@
 include(":sdks:python:test-suites:tox:py38")
 include(":vendor:grpc-1_26_0")
 include(":vendor:grpc-1_36_0")
-include(":vendor:bytebuddy-1_10_8")
+include(":vendor:bytebuddy-1_11_0")
 include(":vendor:calcite-1_26_0")
 include(":vendor:guava-26_0-jre")
 include(":website")
diff --git a/vendor/bytebuddy-1_10_8/build.gradle.kts b/vendor/bytebuddy-1_11_0/build.gradle.kts
similarity index 89%
rename from vendor/bytebuddy-1_10_8/build.gradle.kts
rename to vendor/bytebuddy-1_11_0/build.gradle.kts
index dee66ae..b55c4ed 100644
--- a/vendor/bytebuddy-1_10_8/build.gradle.kts
+++ b/vendor/bytebuddy-1_11_0/build.gradle.kts
@@ -18,7 +18,7 @@
 
 plugins { id("org.apache.beam.vendor-java") }
 
-description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.10.8"
+description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.11.0"
 
 group = "org.apache.beam"
 version = "0.1"
@@ -26,14 +26,14 @@
 val vendorJava = project.extensions.extraProperties.get("vendorJava") as groovy.lang.Closure<*>
 vendorJava(
   mapOf(
-    "dependencies" to listOf("net.bytebuddy:byte-buddy:1.10.8"),
+    "dependencies" to listOf("net.bytebuddy:byte-buddy:1.11.0"),
     "relocations" to mapOf(
-            "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy"),
+            "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy"),
     "exclusions" to listOf(
             "**/module-info.class"
     ),
     "groupId" to group,
-    "artifactId" to "beam-vendor-bytebuddy-1_10_8",
+    "artifactId" to "beam-vendor-bytebuddy-1_11_0",
     "version" to version
   )
 )