[BEAM-10884] - Added proto tests to the PubSubTableProviderIT class (#14648)

* [BEAM-11483] Spark PostCommit Test Improvements - creating one Jenkins job per Gradle task for the Spark streaming tests

* Format code using spotlessApply

* [BEAM-10884] Add Proto support to Pubsub table provider
Adding Proto tests for PubSubTableProvider

* temporary changes for PubsubTableProviderIT proto support

* [BEAM-10884] Add Proto support to Pubsub table provider: added new class (PubsubProtoObjectProvider) to PubsubTableProviderIT to test the PubSub Protobuffer support changes. Also, a slight modification to payload_messages.proto file was needed.

* [BEAM-10884] - Applying suggested code changes to PubSubTableProviderIT after code review.
diff --git a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
index 1486ee8..cecd9d3 100644
--- a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
+++ b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
@@ -42,3 +42,26 @@
   int32 id = 1;
   string name = 2;
 }
+
+message NameMessage {
+  string name = 1;
+
+  enum NameType {
+    FIRST = 0;
+    MIDDLE = 1;
+    LAST = 2;
+    SECOND_LAST = 3;
+  }
+  repeated NameType name_array = 2;
+}
+
+message NameHeightMessage {
+  string name = 1;
+  int32 height = 2;
+}
+
+message NameHeightKnowsJSMessage {
+  string name = 1;
+  int32 height = 2;
+  bool knows_javascript = 3;
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
index a90f8e8..6413fd2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
@@ -49,6 +49,7 @@
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -105,7 +106,11 @@
   @Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(
-        new Object[][] {{new PubsubJsonObjectProvider()}, {new PubsubAvroObjectProvider()}});
+        new Object[][] {
+          {new PubsubJsonObjectProvider()},
+          {new PubsubAvroObjectProvider()},
+          {new PubsubProtoObjectProvider()}
+        });
   }
 
   @Parameter public PubsubObjectProvider objectsProvider;
@@ -120,6 +125,7 @@
 
   @Test
   public void testSQLSelectsPayloadContent() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -134,18 +140,15 @@
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
                 + "%s"
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload.id, message.payload.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -173,7 +176,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -181,6 +188,7 @@
 
   @Test
   public void testSQLSelectsArrayAttributes() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -195,19 +203,16 @@
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
                 + "%s"
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString =
         "SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -241,7 +246,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(1));
@@ -249,6 +258,14 @@
 
   @Test
   public void testSQLWithBytePayload() throws Exception {
+
+    // Prepare messages to send later
+    List<PubsubMessage> messages =
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz"));
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -259,18 +276,14 @@
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload AS some_bytes FROM message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -307,6 +320,7 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testUsesDlq() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -323,24 +337,17 @@
                 + "    '{ "
                 + "       %s"
                 + "       \"timestampAttributeKey\" : \"ts\", "
-                + "       \"deadLetterQueue\" : \"%s\""
+                + "       \"deadLetterQueue\" : \"%s\", "
+                + "       \"protoClass\" : \"%s\" "
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
-            dlqTopic.topicPath());
+            dlqTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload.id, message.payload.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"),
-            messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
-            messagePayload(ts(5), "{ + }", ImmutableMap.of())); // invalid message, will go to DLQ
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -368,7 +375,13 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topics are ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz"),
+            messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
+            messagePayload(ts(5), "{ + }", ImmutableMap.of()))); // invalid message, will go to DLQ
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(4));
@@ -381,6 +394,7 @@
   @Test
   @SuppressWarnings({"unchecked", "rawtypes"})
   public void testSQLLimit() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -397,12 +411,14 @@
                 + "    '{ "
                 + "       %s"
                 + "       \"timestampAttributeKey\" : \"ts\", "
-                + "       \"deadLetterQueue\" : \"%s\""
+                + "       \"deadLetterQueue\" : \"%s\", "
+                + "       \"protoClass\" : \"%s\" "
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
-            dlqTopic.topicPath());
+            dlqTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     List<PubsubMessage> messages =
         ImmutableList.of(
@@ -441,13 +457,16 @@
 
     eventsTopic.assertSubscriptionEventuallyCreated(
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+
     eventsTopic.publish(messages);
+
     assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));
     pool.shutdown();
   }
 
   @Test
   public void testSQLSelectsPayloadContentFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -460,19 +479,16 @@
                 + "TBLPROPERTIES "
                 + "    '{ "
                 + "       %s"
+                + "       \"protoClass\" : \"%s\", "
                 + "       \"timestampAttributeKey\" : \"ts\" "
                 + "     }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.id, message.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -500,7 +516,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -509,24 +529,27 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testSQLInsertRowsToPubsubFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
                 + "event_timestamp TIMESTAMP, \n"
                 + "name VARCHAR, \n"
                 + "height INTEGER, \n"
-                + "knowsJavascript BOOLEAN \n"
+                + "knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES "
                 + "    '{ "
                 + "       %s"
+                + "       \"protoClass\" : \"%s\", "
                 + "       \"deadLetterQueue\" : \"%s\""
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
+            PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
             dlqTopic.topicPath());
 
     // Initialize SQL environment and create the pubsub table
@@ -536,7 +559,7 @@
     // TODO(BEAM-8741): Ideally we could write this query without specifying a column list, because
     //   it shouldn't be possible to write to event_timestamp when it's mapped to  publish time.
     String queryString =
-        "INSERT INTO message (name, height, knowsJavascript) \n"
+        "INSERT INTO message (name, height, knows_javascript) \n"
             + "VALUES \n"
             + "('person1', 80, TRUE), \n"
             + "('person2', 70, FALSE)";
@@ -556,25 +579,28 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
                 + "  event_timestamp TIMESTAMP, \n"
                 + "  name VARCHAR, \n"
                 + "  height INTEGER, \n"
-                + "  knowsJavascript BOOLEAN \n"
+                + "  knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES "
                 + "  '{ "
                 + "     %s "
+                + "     \"protoClass\" : \"%s\", "
                 + "     \"deadLetterQueue\" : \"%s\","
                 + "     \"timestampAttributeKey\" : \"ts\""
                 + "   }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
+            PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
             dlqTopic.topicPath());
 
     // Initialize SQL environment and create the pubsub table
@@ -610,20 +636,31 @@
         objectsProvider.getPayloadFormat() == null
             ? ""
             : String.format(
-                "TBLPROPERTIES '{\"format\": \"%s\"}'", objectsProvider.getPayloadFormat());
+                "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+                PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
+                objectsProvider.getPayloadFormat());
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE people (\n"
                 + "event_timestamp TIMESTAMP, \n"
                 + "name VARCHAR, \n"
                 + "height INTEGER, \n"
-                + "knowsJavascript BOOLEAN \n"
+                + "knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "%s",
             tableProvider.getTableType(), eventsTopic.topicPath(), tblProperties);
 
+    String filteredTblProperties =
+        objectsProvider.getPayloadFormat() == null
+            ? ""
+            : String.format(
+                "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+                PayloadMessages.NameHeightMessage.class.getName(),
+                objectsProvider.getPayloadFormat());
+
     String createFilteredTableString =
         String.format(
             "CREATE EXTERNAL TABLE javascript_people (\n"
@@ -634,7 +671,7 @@
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "%s",
-            tableProvider.getTableType(), filteredEventsTopic.topicPath(), tblProperties);
+            tableProvider.getTableType(), filteredEventsTopic.topicPath(), filteredTblProperties);
 
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
@@ -650,11 +687,11 @@
             + "    name, \n"
             + "    height \n"
             + "  FROM people \n"
-            + "  WHERE knowsJavascript \n"
+            + "  WHERE knows_javascript \n"
             + ")";
 
     String injectQueryString =
-        "INSERT INTO people (name, height, knowsJavascript) VALUES \n"
+        "INSERT INTO people (name, height, knows_javascript) VALUES \n"
             + "('person1', 80, TRUE),  \n"
             + "('person2', 70, FALSE), \n"
             + "('person3', 60, TRUE),  \n"
@@ -781,6 +818,57 @@
         throws Exception;
   }
 
+  private static class PubsubProtoObjectProvider extends PubsubObjectProvider {
+
+    @Override
+    protected String getPayloadFormat() {
+      return "proto";
+    }
+
+    @Override
+    protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
+
+      PayloadMessages.SimpleMessage.Builder simpleMessage =
+          PayloadMessages.SimpleMessage.newBuilder().setId(id).setName(name);
+
+      return PubsubTableProviderIT.message(
+          timestamp,
+          simpleMessage.build().toByteArray(),
+          ImmutableMap.of(name, Integer.toString(id)));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
+
+      PayloadMessages.NameMessage.Builder nameMessage =
+          PayloadMessages.NameMessage.newBuilder().setName(name);
+
+      return hasProperty("payload", equalTo(nameMessage.build().toByteArray()));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
+        String name, int height, boolean knowsJS) throws IOException {
+
+      PayloadMessages.NameHeightKnowsJSMessage.Builder nameHeightKnowsJSMessage =
+          PayloadMessages.NameHeightKnowsJSMessage.newBuilder()
+              .setHeight(height)
+              .setName(name)
+              .setKnowsJavascript(knowsJS);
+
+      return hasProperty("payload", equalTo(nameHeightKnowsJSMessage.build().toByteArray()));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
+
+      PayloadMessages.NameHeightMessage.Builder nameHeightMessage =
+          PayloadMessages.NameHeightMessage.newBuilder().setName(name).setHeight(height);
+
+      return hasProperty("payload", equalTo(nameHeightMessage.build().toByteArray()));
+    }
+  }
+
   private static class PubsubJsonObjectProvider extends PubsubObjectProvider {
 
     // Pubsub table provider should default to json
@@ -805,7 +893,7 @@
         String name, int height, boolean knowsJS) throws IOException {
       String jsonString =
           String.format(
-              "{\"name\":\"%s\", \"height\": %s, \"knowsJavascript\": %s}", name, height, knowsJS);
+              "{\"name\":\"%s\", \"height\": %s, \"knows_javascript\": %s}", name, height, knowsJS);
 
       return hasProperty("payload", toJsonByteLike(jsonString));
     }
@@ -831,7 +919,7 @@
         Schema.builder()
             .addNullableField("name", Schema.FieldType.STRING)
             .addNullableField("height", Schema.FieldType.INT32)
-            .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN)
+            .addNullableField("knows_javascript", Schema.FieldType.BOOLEAN)
             .build();
 
     private static final Schema NAME_HEIGHT_SCHEMA =