Merge pull request #14798 from ihji/BEAM-12324

[BEAM-12324] TranslationsTest.test_run_packable_combine_* failing on PostCommit_Py_VR_Dataflow
diff --git a/CHANGES.md b/CHANGES.md
index af72444..90d0123 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -97,6 +97,7 @@
 * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Added capability to declare resource hints in Java and Python SDKs ([BEAM-2085](https://issues.apache.org/jira/browse/BEAM-2085)).
 * Added Spanner IO Performance tests for read and write. (Python) ([BEAM-10029](https://issues.apache.org/jira/browse/BEAM-10029)).
+* Added support for accessing GCP PubSub Message ordering keys, message IDs and message publish timestamp (Python) ([BEAM-7819](https://issues.apache.org/jira/browse/BEAM-7819)).
 
 ## Breaking Changes
 
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index ff320f7..8748ddc 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -13,6 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// Package xlangx contains various low-level utilities needed for adding
+// cross-language transforms to the pipeline.
 package xlangx
 
 import (
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
index c038463..cdb3457 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
@@ -19,10 +19,10 @@
 	"bytes"
 	"reflect"
 
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/schema"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-	"github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"google.golang.org/protobuf/proto"
 )
 
@@ -53,7 +53,7 @@
 	}
 
 	// Put schema and row into payload proto, and marshal it.
-	ecp := &pipeline_v1.ExternalConfigurationPayload{
+	ecp := &pipepb.ExternalConfigurationPayload{
 		Schema:  scm,
 		Payload: buf.Bytes(),
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 1ba1ac4..b434650 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -100,7 +100,7 @@
 
 				var resolvedDeps []*pipepb.ArtifactInformation
 				for _, a := range resolvedArtifacts {
-					name, _ := artifact.MustExtractFilePayload(a)
+					name, sha256 := artifact.MustExtractFilePayload(a)
 					fullTmpPath := filepath.Join(tmpPath, "/", name)
 					fullSdkPath := fullTmpPath
 					if len(cfg.SdkPath) > 0 {
@@ -111,7 +111,8 @@
 							TypeUrn: "beam:artifact:type:file:v1",
 							TypePayload: protox.MustEncode(
 								&pipepb.ArtifactFilePayload{
-									Path: fullSdkPath,
+									Path:   fullSdkPath,
+									Sha256: sha256,
 								},
 							),
 							RoleUrn:     a.RoleUrn,
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
index 9146e05..70d7965 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
@@ -20,6 +20,7 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -113,8 +114,7 @@
       return add(type, name, false);
     }
 
-    public ParameterListBuilder add(
-        final Class<?> type, final String name, final boolean optional) {
+    public ParameterListBuilder add(final Type type, final String name, final boolean optional) {
       final int ordinal = builder.size();
       builder.add(
           new FunctionParameter() {
@@ -142,7 +142,7 @@
     }
 
     public ParameterListBuilder addMethodParameters(Method method) {
-      final Class<?>[] types = method.getParameterTypes();
+      final Type[] types = method.getGenericParameterTypes();
       for (int i = 0; i < types.length; i++) {
         add(
             types[i],
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index 88c5b07..5ad634fb 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -142,7 +142,20 @@
         sqlScalarUdfs.put(createFunctionStmt.getNamePath(), createFunctionStmt);
         break;
       case USER_DEFINED_JAVA_SCALAR_FUNCTIONS:
-        validateJavaUdf(createFunctionStmt);
+        String functionName = String.join(".", createFunctionStmt.getNamePath());
+        for (FunctionArgumentType argumentType :
+            createFunctionStmt.getSignature().getFunctionArgumentList()) {
+          Type type = argumentType.getType();
+          if (type == null) {
+            throw new UnsupportedOperationException(
+                "UDF templated argument types are not supported.");
+          }
+          validateJavaUdfZetaSqlType(type, functionName);
+        }
+        if (createFunctionStmt.getReturnType() == null) {
+          throw new IllegalArgumentException("UDF return type must not be null.");
+        }
+        validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType(), functionName);
         String jarPath = getJarPath(createFunctionStmt);
         ScalarFn scalarFn =
             javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath);
@@ -174,29 +187,14 @@
             ImmutableList.of(createFunctionStmt.getSignature())));
   }
 
-  void validateJavaUdf(ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt) {
-    for (FunctionArgumentType argumentType :
-        createFunctionStmt.getSignature().getFunctionArgumentList()) {
-      Type type = argumentType.getType();
-      if (type == null) {
-        throw new UnsupportedOperationException("UDF templated argument types are not supported.");
-      }
-      validateJavaUdfZetaSqlType(type);
-    }
-    if (createFunctionStmt.getReturnType() == null) {
-      throw new IllegalArgumentException("UDF return type must not be null.");
-    }
-    validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType());
-  }
-
   /**
    * Throws {@link UnsupportedOperationException} if ZetaSQL type is not supported in Java UDF.
    * Supported types are a subset of the types supported by {@link BeamJavaUdfCalcRule}.
    *
-   * <p>Supported types should be kept in sync with {@link
-   * #validateJavaUdfCalciteType(RelDataType)}.
+   * <p>Supported types should be kept in sync with {@link #validateJavaUdfCalciteType(RelDataType,
+   * String)}.
    */
-  void validateJavaUdfZetaSqlType(Type type) {
+  void validateJavaUdfZetaSqlType(Type type, String functionName) {
     switch (type.getKind()) {
       case TYPE_BOOL:
       case TYPE_BYTES:
@@ -207,14 +205,17 @@
       case TYPE_TIMESTAMP:
         // These types are supported.
         break;
+      case TYPE_ARRAY:
+        validateJavaUdfZetaSqlType(type.asArray().getElementType(), functionName);
+        break;
       case TYPE_NUMERIC:
       case TYPE_TIME:
       case TYPE_DATETIME:
-      case TYPE_ARRAY:
       case TYPE_STRUCT:
       default:
         throw new UnsupportedOperationException(
-            "ZetaSQL type not allowed in Java UDF: " + type.getKind().name());
+            String.format(
+                "ZetaSQL type %s not allowed in function %s", type.getKind().name(), functionName));
     }
   }
 
@@ -365,7 +366,10 @@
           ScalarFunctionImpl scalarFunction = (ScalarFunctionImpl) function;
           // Validate types before converting from Calcite to ZetaSQL, since the conversion may fail
           // for unsupported types.
-          validateScalarFunctionImpl(scalarFunction);
+          for (FunctionParameter parameter : scalarFunction.getParameters()) {
+            validateJavaUdfCalciteType(parameter.getType(typeFactory), functionName);
+          }
+          validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory), functionName);
           Method method = scalarFunction.method;
           javaScalarUdfs.put(path, UserFunctionDefinitions.JavaScalarFunction.create(method, ""));
           FunctionArgumentType resultType =
@@ -414,21 +418,15 @@
         .collect(Collectors.toList());
   }
 
-  private void validateScalarFunctionImpl(ScalarFunctionImpl scalarFunction) {
-    for (FunctionParameter parameter : scalarFunction.getParameters()) {
-      validateJavaUdfCalciteType(parameter.getType(typeFactory));
-    }
-    validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory));
-  }
-
   /**
    * Throws {@link UnsupportedOperationException} if Calcite type is not supported in Java UDF.
    * Supported types are a subset of the corresponding Calcite types supported by {@link
    * BeamJavaUdfCalcRule}.
    *
-   * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type)}.
+   * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type,
+   * String)}.
    */
-  private void validateJavaUdfCalciteType(RelDataType type) {
+  private void validateJavaUdfCalciteType(RelDataType type, String functionName) {
     switch (type.getSqlTypeName()) {
       case BIGINT:
       case BOOLEAN:
@@ -439,14 +437,18 @@
       case VARBINARY:
         // These types are supported.
         break;
+      case ARRAY:
+        validateJavaUdfCalciteType(type.getComponentType(), functionName);
+        break;
       case DECIMAL:
       case TIME:
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-      case ARRAY:
       case ROW:
       default:
         throw new UnsupportedOperationException(
-            "Calcite type not allowed in ZetaSQL Java UDF: " + type.getSqlTypeName().getName());
+            String.format(
+                "Calcite type %s not allowed in function %s",
+                type.getSqlTypeName().getName(), functionName));
     }
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
index 2196046..94c984c 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
@@ -25,6 +25,7 @@
 import com.google.zetasql.AnalyzerOptions;
 import com.google.zetasql.resolvedast.ResolvedNodes;
 import java.lang.reflect.Method;
+import java.sql.Time;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
@@ -53,14 +54,14 @@
     }
   }
 
-  public static class ReturnsArrayFn implements BeamSqlUdf {
-    public List<Long> eval() {
-      return ImmutableList.of(1L, 2L, 3L);
+  public static class ReturnsArrayTimeFn implements BeamSqlUdf {
+    public List<Time> eval() {
+      return ImmutableList.of(new Time(0));
     }
   }
 
-  public static class TakesArrayFn implements BeamSqlUdf {
-    public Long eval(List<Long> ls) {
+  public static class TakesArrayTimeFn implements BeamSqlUdf {
+    public Long eval(List<Time> ls) {
       return 0L;
     }
   }
@@ -92,10 +93,10 @@
   public void rejectsScalarFunctionImplWithUnsupportedReturnType() throws NoSuchMethodException {
     JdbcConnection jdbcConnection = createJdbcConnection();
     SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
-    Method method = ReturnsArrayFn.class.getMethod("eval");
+    Method method = ReturnsArrayTimeFn.class.getMethod("eval");
     calciteSchema.add("return_array", ScalarFunctionImpl.create(method));
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+    thrown.expectMessage("Calcite type TIME not allowed in function return_array");
     BeamZetaSqlCatalog beamCatalog =
         BeamZetaSqlCatalog.create(
             calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -105,10 +106,10 @@
   public void rejectsScalarFunctionImplWithUnsupportedParameterType() throws NoSuchMethodException {
     JdbcConnection jdbcConnection = createJdbcConnection();
     SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
-    Method method = TakesArrayFn.class.getMethod("eval", List.class);
+    Method method = TakesArrayTimeFn.class.getMethod("eval", List.class);
     calciteSchema.add("take_array", ScalarFunctionImpl.create(method));
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+    thrown.expectMessage("Calcite type TIME not allowed in function take_array");
     BeamZetaSqlCatalog beamCatalog =
         BeamZetaSqlCatalog.create(
             calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -125,14 +126,14 @@
             analyzerOptions);
 
     String sql =
-        "CREATE FUNCTION foo() RETURNS ARRAY<INT64> LANGUAGE java OPTIONS (path='/does/not/exist');";
+        "CREATE FUNCTION foo() RETURNS ARRAY<TIME> LANGUAGE java OPTIONS (path='/does/not/exist');";
     ResolvedNodes.ResolvedStatement resolvedStatement =
         Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
     ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
         (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
 
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
     beamCatalog.addFunction(createFunctionStmt);
   }
 
@@ -147,14 +148,14 @@
             analyzerOptions);
 
     String sql =
-        "CREATE FUNCTION foo(a ARRAY<INT64>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
+        "CREATE FUNCTION foo(a ARRAY<TIME>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
     ResolvedNodes.ResolvedStatement resolvedStatement =
         Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
     ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
         (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
 
     thrown.expect(UnsupportedOperationException.class);
-    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
     beamCatalog.addFunction(createFunctionStmt);
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
index a4972b9..fda9ef8 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
@@ -20,6 +20,7 @@
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -37,6 +38,7 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -81,6 +83,7 @@
                   .addDoubleField("float64_nan")
                   .addLogicalTypeField("f_date", SqlTypes.DATE)
                   .addDateTimeField("f_timestamp")
+                  .addArrayField("array_int64", Schema.FieldType.INT64)
                   .build())
           .addRows(
               true /* boolean_true */,
@@ -106,7 +109,8 @@
               Double.NEGATIVE_INFINITY /* float64_neg_inf */,
               Double.NaN /* float64_nan */,
               LocalDate.of(2021, 4, 26) /* f_date */,
-              new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC) /* f_timestamp */);
+              new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC) /* f_timestamp */,
+              ImmutableList.of(1L, 2L, 3L) /* array_int64 */);
 
   @Before
   public void setUp() throws NoSuchMethodException {
@@ -140,6 +144,9 @@
     schema.add(
         "test_timestamp",
         ScalarFunctionImpl.create(TimestampIdentityFn.class.getMethod("eval", Timestamp.class)));
+    schema.add(
+        "test_array",
+        ScalarFunctionImpl.create(ListIdentityFn.class.getMethod("eval", List.class)));
 
     this.config = Frameworks.newConfigBuilder(config).defaultSchema(schema).build();
   }
@@ -186,6 +193,12 @@
     }
   }
 
+  public static class ListIdentityFn implements BeamSqlUdf {
+    public List<Long> eval(List<Long> input) {
+      return input;
+    }
+  }
+
   private void runUdfTypeTest(String query, Object result, Schema.TypeName typeName) {
     runUdfTypeTest(query, result, Schema.FieldType.of(typeName));
   }
@@ -490,4 +503,20 @@
         new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC),
         Schema.TypeName.DATETIME);
   }
+
+  @Test
+  public void testArrayLiteral() {
+    runUdfTypeTest(
+        "SELECT test_array(ARRAY<INT64>[1, 2, 3]);",
+        ImmutableList.of(1L, 2L, 3L),
+        Schema.FieldType.array(Schema.FieldType.INT64));
+  }
+
+  @Test
+  public void testArrayInput() {
+    runUdfTypeTest(
+        "SELECT test_array(array_int64) FROM table;",
+        ImmutableList.of(1L, 2L, 3L),
+        Schema.FieldType.array(Schema.FieldType.INT64));
+  }
 }
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 4e2811e..39312bf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -69,13 +69,29 @@
     attributes: (dict) Key-value map of str to str, containing both user-defined
       and service generated attributes (such as id_label and
       timestamp_attribute). May be None.
+    message_id: (str) ID of the message, assigned by the pubsub service when the
+      message is published. Guaranteed to be unique within the topic. Will be
+      reset to None if the message is being written to pubsub.
+    publish_time: (datetime) Time at which the message was published. Will be
+      reset to None if the Message is being written to pubsub.
+    ordering_key: (str) If non-empty, identifies related messages for which
+      publish order is respected by the PubSub subscription.
   """
-  def __init__(self, data, attributes):
+  def __init__(
+      self,
+      data,
+      attributes,
+      message_id=None,
+      publish_time=None,
+      ordering_key=""):
     if data is None and not attributes:
       raise ValueError(
           'Either data (%r) or attributes (%r) must be set.', data, attributes)
     self.data = data
     self.attributes = attributes
+    self.message_id = message_id
+    self.publish_time = publish_time
+    self.ordering_key = ordering_key
 
   def __hash__(self):
     return hash((self.data, frozenset(self.attributes.items())))
@@ -104,13 +120,21 @@
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, attributes)
+    return PubsubMessage(
+        msg.data,
+        attributes,
+        msg.message_id,
+        msg.publish_time.ToDatetime(),
+        msg.ordering_key)
 
-  def _to_proto_str(self):
+  def _to_proto_str(self, for_publish=False):
     """Get serialized form of ``PubsubMessage``.
 
     Args:
       proto_msg: str containing a serialized protobuf.
+      for_publish: bool, if True strip out message fields which cannot be
+        published (currently message_id and publish_time) per
+        https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage
 
     Returns:
       A str containing a serialized protobuf of type
@@ -121,6 +145,11 @@
     msg.data = self.data
     for key, value in self.attributes.items():
       msg.attributes[key] = value
+    if self.message_id and not for_publish:
+      msg.message_id = self.message_id
+    if self.publish_time and not for_publish:
+      msg.publish_time = msg.publish_time.FromDatetime(self.publish_time)
+    msg.ordering_key = self.ordering_key
     return msg.SerializeToString()
 
   @staticmethod
@@ -133,7 +162,14 @@
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, attributes)
+    pubsubmessage = PubsubMessage(msg.data, attributes)
+    if msg.message_id:
+      pubsubmessage.message_id = msg.message_id
+    if msg.publish_time:
+      pubsubmessage.publish_time = msg.publish_time
+    if msg.ordering_key:
+      pubsubmessage.ordering_key = msg.ordering_key
+    return pubsubmessage
 
 
 class ReadFromPubSub(PTransform):
@@ -294,7 +330,7 @@
       raise TypeError(
           'Unexpected element. Type: %s (expected: PubsubMessage), '
           'value: %r' % (type(element), element))
-    return element._to_proto_str()
+    return element._to_proto_str(for_publish=True)
 
   @staticmethod
   def bytes_to_proto_str(element):
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 76fa14c..fd14ab4 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -208,7 +208,7 @@
 include(":sdks:python:test-suites:tox:py38")
 include(":vendor:grpc-1_26_0")
 include(":vendor:grpc-1_36_0")
-include(":vendor:bytebuddy-1_10_8")
+include(":vendor:bytebuddy-1_11_0")
 include(":vendor:calcite-1_26_0")
 include(":vendor:guava-26_0-jre")
 include(":website")
diff --git a/vendor/bytebuddy-1_10_8/build.gradle.kts b/vendor/bytebuddy-1_11_0/build.gradle.kts
similarity index 89%
rename from vendor/bytebuddy-1_10_8/build.gradle.kts
rename to vendor/bytebuddy-1_11_0/build.gradle.kts
index dee66ae..b55c4ed 100644
--- a/vendor/bytebuddy-1_10_8/build.gradle.kts
+++ b/vendor/bytebuddy-1_11_0/build.gradle.kts
@@ -18,7 +18,7 @@
 
 plugins { id("org.apache.beam.vendor-java") }
 
-description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.10.8"
+description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.11.0"
 
 group = "org.apache.beam"
 version = "0.1"
@@ -26,14 +26,14 @@
 val vendorJava = project.extensions.extraProperties.get("vendorJava") as groovy.lang.Closure<*>
 vendorJava(
   mapOf(
-    "dependencies" to listOf("net.bytebuddy:byte-buddy:1.10.8"),
+    "dependencies" to listOf("net.bytebuddy:byte-buddy:1.11.0"),
     "relocations" to mapOf(
-            "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy"),
+            "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy"),
     "exclusions" to listOf(
             "**/module-info.class"
     ),
     "groupId" to group,
-    "artifactId" to "beam-vendor-bytebuddy-1_10_8",
+    "artifactId" to "beam-vendor-bytebuddy-1_11_0",
     "version" to version
   )
 )