[BEAM-10884] - Added proto tests to the PubSubTableProviderIT class (#14648)
* [BEAM-11483] Spark PostCommit Test Improvements - creating one Jenkins job per Gradle task for the Spark streaming tests
* Format code using spotlessApply
* [BEAM-10884] Add Proto support to Pubsub table provider
Adding Proto tests for PubSubTableProvider
* temporary changes for PubsubTableProviderIT proto support
* [BEAM-10884] Add Proto support to Pubsub table provider: added new class (PubsubProtoObjectProvider) to PubsubTableProviderIT to test the PubSub Protobuffer support changes. Also, a slight modification to payload_messages.proto file was needed.
* [BEAM-10884] - Applying suggested code changes to PubSubTableProviderIT after code review.
diff --git a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
index 1486ee8..cecd9d3 100644
--- a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
+++ b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
@@ -42,3 +42,26 @@
int32 id = 1;
string name = 2;
}
+
+message NameMessage {
+ string name = 1;
+
+ enum NameType {
+ FIRST = 0;
+ MIDDLE = 1;
+ LAST = 2;
+ SECOND_LAST = 3;
+ }
+ repeated NameType name_array = 2;
+}
+
+message NameHeightMessage {
+ string name = 1;
+ int32 height = 2;
+}
+
+message NameHeightKnowsJSMessage {
+ string name = 1;
+ int32 height = 2;
+ bool knows_javascript = 3;
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
index a90f8e8..6413fd2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
@@ -49,6 +49,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -105,7 +106,11 @@
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
- new Object[][] {{new PubsubJsonObjectProvider()}, {new PubsubAvroObjectProvider()}});
+ new Object[][] {
+ {new PubsubJsonObjectProvider()},
+ {new PubsubAvroObjectProvider()},
+ {new PubsubProtoObjectProvider()}
+ });
}
@Parameter public PubsubObjectProvider objectsProvider;
@@ -120,6 +125,7 @@
@Test
public void testSQLSelectsPayloadContent() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -134,18 +140,15 @@
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "%s"
+ + "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
- tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+ tableProvider.getTableType(),
+ eventsTopic.topicPath(),
+ payloadFormatParam(),
+ PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload.id, message.payload.name from message";
- // Prepare messages to send later
- List<PubsubMessage> messages =
- ImmutableList.of(
- objectsProvider.messageIdName(ts(1), 3, "foo"),
- objectsProvider.messageIdName(ts(2), 5, "bar"),
- objectsProvider.messageIdName(ts(3), 7, "baz"));
-
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
@@ -173,7 +176,11 @@
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
- eventsTopic.publish(messages);
+ eventsTopic.publish(
+ ImmutableList.of(
+ objectsProvider.messageIdName(ts(1), 3, "foo"),
+ objectsProvider.messageIdName(ts(2), 5, "bar"),
+ objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -181,6 +188,7 @@
@Test
public void testSQLSelectsArrayAttributes() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -195,19 +203,16 @@
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "%s"
+ + "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
- tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+ tableProvider.getTableType(),
+ eventsTopic.topicPath(),
+ payloadFormatParam(),
+ PayloadMessages.SimpleMessage.class.getName());
String queryString =
"SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";
- // Prepare messages to send later
- List<PubsubMessage> messages =
- ImmutableList.of(
- objectsProvider.messageIdName(ts(1), 3, "foo"),
- objectsProvider.messageIdName(ts(2), 5, "bar"),
- objectsProvider.messageIdName(ts(3), 7, "baz"));
-
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
@@ -241,7 +246,11 @@
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
- eventsTopic.publish(messages);
+ eventsTopic.publish(
+ ImmutableList.of(
+ objectsProvider.messageIdName(ts(1), 3, "foo"),
+ objectsProvider.messageIdName(ts(2), 5, "bar"),
+ objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(1));
@@ -249,6 +258,14 @@
@Test
public void testSQLWithBytePayload() throws Exception {
+
+ // Prepare messages to send later
+ List<PubsubMessage> messages =
+ ImmutableList.of(
+ objectsProvider.messageIdName(ts(1), 3, "foo"),
+ objectsProvider.messageIdName(ts(2), 5, "bar"),
+ objectsProvider.messageIdName(ts(3), 7, "baz"));
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -259,18 +276,14 @@
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ + "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
- tableProvider.getTableType(), eventsTopic.topicPath());
+ tableProvider.getTableType(),
+ eventsTopic.topicPath(),
+ PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload AS some_bytes FROM message";
- // Prepare messages to send later
- List<PubsubMessage> messages =
- ImmutableList.of(
- objectsProvider.messageIdName(ts(1), 3, "foo"),
- objectsProvider.messageIdName(ts(2), 5, "bar"),
- objectsProvider.messageIdName(ts(3), 7, "baz"));
-
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
@@ -307,6 +320,7 @@
@Test
@SuppressWarnings("unchecked")
public void testUsesDlq() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -323,24 +337,17 @@
+ " '{ "
+ " %s"
+ " \"timestampAttributeKey\" : \"ts\", "
- + " \"deadLetterQueue\" : \"%s\""
+ + " \"deadLetterQueue\" : \"%s\", "
+ + " \"protoClass\" : \"%s\" "
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
- dlqTopic.topicPath());
+ dlqTopic.topicPath(),
+ PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload.id, message.payload.name from message";
- // Prepare messages to send later
- List<PubsubMessage> messages =
- ImmutableList.of(
- objectsProvider.messageIdName(ts(1), 3, "foo"),
- objectsProvider.messageIdName(ts(2), 5, "bar"),
- objectsProvider.messageIdName(ts(3), 7, "baz"),
- messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
- messagePayload(ts(5), "{ + }", ImmutableMap.of())); // invalid message, will go to DLQ
-
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
@@ -368,7 +375,13 @@
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topics are ready
- eventsTopic.publish(messages);
+ eventsTopic.publish(
+ ImmutableList.of(
+ objectsProvider.messageIdName(ts(1), 3, "foo"),
+ objectsProvider.messageIdName(ts(2), 5, "bar"),
+ objectsProvider.messageIdName(ts(3), 7, "baz"),
+ messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
+ messagePayload(ts(5), "{ + }", ImmutableMap.of()))); // invalid message, will go to DLQ
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(4));
@@ -381,6 +394,7 @@
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testSQLLimit() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -397,12 +411,14 @@
+ " '{ "
+ " %s"
+ " \"timestampAttributeKey\" : \"ts\", "
- + " \"deadLetterQueue\" : \"%s\""
+ + " \"deadLetterQueue\" : \"%s\", "
+ + " \"protoClass\" : \"%s\" "
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
- dlqTopic.topicPath());
+ dlqTopic.topicPath(),
+ PayloadMessages.SimpleMessage.class.getName());
List<PubsubMessage> messages =
ImmutableList.of(
@@ -441,13 +457,16 @@
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+
eventsTopic.publish(messages);
+
assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));
pool.shutdown();
}
@Test
public void testSQLSelectsPayloadContentFlat() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
@@ -460,19 +479,16 @@
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ + " \"protoClass\" : \"%s\", "
+ " \"timestampAttributeKey\" : \"ts\" "
+ " }'",
- tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+ tableProvider.getTableType(),
+ eventsTopic.topicPath(),
+ payloadFormatParam(),
+ PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.id, message.name from message";
- // Prepare messages to send later
- List<PubsubMessage> messages =
- ImmutableList.of(
- objectsProvider.messageIdName(ts(1), 3, "foo"),
- objectsProvider.messageIdName(ts(2), 5, "bar"),
- objectsProvider.messageIdName(ts(3), 7, "baz"));
-
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
@@ -500,7 +516,11 @@
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
- eventsTopic.publish(messages);
+ eventsTopic.publish(
+ ImmutableList.of(
+ objectsProvider.messageIdName(ts(1), 3, "foo"),
+ objectsProvider.messageIdName(ts(2), 5, "bar"),
+ objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -509,24 +529,27 @@
@Test
@SuppressWarnings("unchecked")
public void testSQLInsertRowsToPubsubFlat() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "name VARCHAR, \n"
+ "height INTEGER, \n"
- + "knowsJavascript BOOLEAN \n"
+ + "knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ + " \"protoClass\" : \"%s\", "
+ " \"deadLetterQueue\" : \"%s\""
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
+ PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
dlqTopic.topicPath());
// Initialize SQL environment and create the pubsub table
@@ -536,7 +559,7 @@
// TODO(BEAM-8741): Ideally we could write this query without specifying a column list, because
// it shouldn't be possible to write to event_timestamp when it's mapped to publish time.
String queryString =
- "INSERT INTO message (name, height, knowsJavascript) \n"
+ "INSERT INTO message (name, height, knows_javascript) \n"
+ "VALUES \n"
+ "('person1', 80, TRUE), \n"
+ "('person2', 70, FALSE)";
@@ -556,25 +579,28 @@
@Test
@SuppressWarnings("unchecked")
public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ " event_timestamp TIMESTAMP, \n"
+ " name VARCHAR, \n"
+ " height INTEGER, \n"
- + " knowsJavascript BOOLEAN \n"
+ + " knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s "
+ + " \"protoClass\" : \"%s\", "
+ " \"deadLetterQueue\" : \"%s\","
+ " \"timestampAttributeKey\" : \"ts\""
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
+ PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
dlqTopic.topicPath());
// Initialize SQL environment and create the pubsub table
@@ -610,20 +636,31 @@
objectsProvider.getPayloadFormat() == null
? ""
: String.format(
- "TBLPROPERTIES '{\"format\": \"%s\"}'", objectsProvider.getPayloadFormat());
+ "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+ PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
+ objectsProvider.getPayloadFormat());
+
String createTableString =
String.format(
"CREATE EXTERNAL TABLE people (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "name VARCHAR, \n"
+ "height INTEGER, \n"
- + "knowsJavascript BOOLEAN \n"
+ + "knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "%s",
tableProvider.getTableType(), eventsTopic.topicPath(), tblProperties);
+ String filteredTblProperties =
+ objectsProvider.getPayloadFormat() == null
+ ? ""
+ : String.format(
+ "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+ PayloadMessages.NameHeightMessage.class.getName(),
+ objectsProvider.getPayloadFormat());
+
String createFilteredTableString =
String.format(
"CREATE EXTERNAL TABLE javascript_people (\n"
@@ -634,7 +671,7 @@
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "%s",
- tableProvider.getTableType(), filteredEventsTopic.topicPath(), tblProperties);
+ tableProvider.getTableType(), filteredEventsTopic.topicPath(), filteredTblProperties);
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
@@ -650,11 +687,11 @@
+ " name, \n"
+ " height \n"
+ " FROM people \n"
- + " WHERE knowsJavascript \n"
+ + " WHERE knows_javascript \n"
+ ")";
String injectQueryString =
- "INSERT INTO people (name, height, knowsJavascript) VALUES \n"
+ "INSERT INTO people (name, height, knows_javascript) VALUES \n"
+ "('person1', 80, TRUE), \n"
+ "('person2', 70, FALSE), \n"
+ "('person3', 60, TRUE), \n"
@@ -781,6 +818,57 @@
throws Exception;
}
+ private static class PubsubProtoObjectProvider extends PubsubObjectProvider {
+
+ @Override
+ protected String getPayloadFormat() {
+ return "proto";
+ }
+
+ @Override
+ protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
+
+ PayloadMessages.SimpleMessage.Builder simpleMessage =
+ PayloadMessages.SimpleMessage.newBuilder().setId(id).setName(name);
+
+ return PubsubTableProviderIT.message(
+ timestamp,
+ simpleMessage.build().toByteArray(),
+ ImmutableMap.of(name, Integer.toString(id)));
+ }
+
+ @Override
+ protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
+
+ PayloadMessages.NameMessage.Builder nameMessage =
+ PayloadMessages.NameMessage.newBuilder().setName(name);
+
+ return hasProperty("payload", equalTo(nameMessage.build().toByteArray()));
+ }
+
+ @Override
+ protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
+ String name, int height, boolean knowsJS) throws IOException {
+
+ PayloadMessages.NameHeightKnowsJSMessage.Builder nameHeightKnowsJSMessage =
+ PayloadMessages.NameHeightKnowsJSMessage.newBuilder()
+ .setHeight(height)
+ .setName(name)
+ .setKnowsJavascript(knowsJS);
+
+ return hasProperty("payload", equalTo(nameHeightKnowsJSMessage.build().toByteArray()));
+ }
+
+ @Override
+ protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
+
+ PayloadMessages.NameHeightMessage.Builder nameHeightMessage =
+ PayloadMessages.NameHeightMessage.newBuilder().setName(name).setHeight(height);
+
+ return hasProperty("payload", equalTo(nameHeightMessage.build().toByteArray()));
+ }
+ }
+
private static class PubsubJsonObjectProvider extends PubsubObjectProvider {
// Pubsub table provider should default to json
@@ -805,7 +893,7 @@
String name, int height, boolean knowsJS) throws IOException {
String jsonString =
String.format(
- "{\"name\":\"%s\", \"height\": %s, \"knowsJavascript\": %s}", name, height, knowsJS);
+ "{\"name\":\"%s\", \"height\": %s, \"knows_javascript\": %s}", name, height, knowsJS);
return hasProperty("payload", toJsonByteLike(jsonString));
}
@@ -831,7 +919,7 @@
Schema.builder()
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("height", Schema.FieldType.INT32)
- .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN)
+ .addNullableField("knows_javascript", Schema.FieldType.BOOLEAN)
.build();
private static final Schema NAME_HEIGHT_SCHEMA =