blob: 82c4b74ce142a88ae4efbda1fcbc2fbe9276f49c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.testing.JsonMatcher.jsonBytesLike;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteConnection;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@SuppressWarnings({
"keyfor",
})
public class PubsubTableProviderIT implements Serializable {
private static final Schema PAYLOAD_SCHEMA =
Schema.builder()
.addNullableField("id", Schema.FieldType.INT32)
.addNullableField("name", Schema.FieldType.STRING)
.build();
@Rule public transient TestPubsub eventsTopic = TestPubsub.create();
@Rule public transient TestPubsub filteredEventsTopic = TestPubsub.create();
@Rule public transient TestPubsub dlqTopic = TestPubsub.create();
@Rule public transient TestPubsubSignal resultSignal = TestPubsubSignal.create();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient TestPipeline filterPipeline = TestPipeline.create();
private final SchemaIOTableProviderWrapper tableProvider = new PubsubTableProvider();
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{new PubsubJsonObjectProvider()},
{new PubsubAvroObjectProvider()},
{new PubsubProtoObjectProvider()}
});
}
@Parameter public PubsubObjectProvider objectsProvider;
/**
* HACK: we need an objectmapper to turn pipelineoptions back into a map. We need to use
* ReflectHelpers to get the extra PipelineOptions.
*/
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@Test
public void testSQLSelectsPayloadContent() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "%s"
+ "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload.id, message.payload.name from message";
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
// Observe the query results and send success signal after seeing the expected messages
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(PAYLOAD_SCHEMA),
observedRows ->
observedRows.equals(
ImmutableSet.of(
row(PAYLOAD_SCHEMA, 3, "foo"),
row(PAYLOAD_SCHEMA, 5, "bar"),
row(PAYLOAD_SCHEMA, 7, "baz")))));
// Start the pipeline
pipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
}
@Test
public void testSQLSelectsArrayAttributes() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes ARRAY<ROW<key VARCHAR, `value` VARCHAR>>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "%s"
+ "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
PayloadMessages.SimpleMessage.class.getName());
String queryString =
"SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
// Observe the query results and send success signal after seeing the expected messages
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(PAYLOAD_SCHEMA),
observedRows -> {
Map<Integer, String> entries = new HashMap<>();
for (Row row : observedRows) {
if ("ts".equals(row.getString("a1"))) {
entries.put(row.getInt32("id"), row.getString("a2"));
} else {
entries.put(row.getInt32("id"), row.getString("a1"));
}
}
return entries.equals(ImmutableMap.of(3, "foo", 5, "bar", 7, "baz"));
}));
// Start the pipeline
pipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(1));
}
@Test
public void testSQLWithBytePayload() throws Exception {
// Prepare messages to send later
List<PubsubMessage> messages =
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"));
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload VARBINARY \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES '{ "
+ "\"protoClass\" : \"%s\", "
+ "\"timestampAttributeKey\" : \"ts\" }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload AS some_bytes FROM message";
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
// Observe the query results and send success signal after seeing the expected messages
Schema justBytesSchema =
Schema.builder().addField("some_bytes", FieldType.BYTES.withNullable(true)).build();
Row expectedRow0 = row(justBytesSchema, (Object) messages.get(0).getPayload());
Row expectedRow1 = row(justBytesSchema, (Object) messages.get(1).getPayload());
Row expectedRow2 = row(justBytesSchema, (Object) messages.get(2).getPayload());
Set<Row> expected = ImmutableSet.of(expectedRow0, expectedRow1, expectedRow2);
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(justBytesSchema), observedRows -> observedRows.equals(expected)));
// Start the pipeline
pipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(messages);
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
}
@Test
@SuppressWarnings("unchecked")
public void testUsesDlq() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ " \"timestampAttributeKey\" : \"ts\", "
+ " \"deadLetterQueue\" : \"%s\", "
+ " \"protoClass\" : \"%s\" "
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
dlqTopic.topicPath(),
PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.payload.id, message.payload.name from message";
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
// Observe the query results and send success signal after seeing the expected messages
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(PAYLOAD_SCHEMA),
observedRows ->
observedRows.equals(
ImmutableSet.of(
row(PAYLOAD_SCHEMA, 3, "foo"),
row(PAYLOAD_SCHEMA, 5, "bar"),
row(PAYLOAD_SCHEMA, 7, "baz")))));
// Start the pipeline
pipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topics are ready
eventsTopic.publish(
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"),
messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
messagePayload(ts(5), "{ + }", ImmutableMap.of()))); // invalid message, will go to DLQ
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(4));
dlqTopic
.assertThatTopicEventuallyReceives(
matcherPayload(ts(4), "{ - }"), matcherPayload(ts(5), "{ + }"))
.waitForUpTo(Duration.standardSeconds(40));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testSQLLimit() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "attributes MAP<VARCHAR, VARCHAR>, \n"
+ "payload ROW< \n"
+ " id INTEGER, \n"
+ " name VARCHAR \n"
+ " > \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ " \"timestampAttributeKey\" : \"ts\", "
+ " \"deadLetterQueue\" : \"%s\", "
+ " \"protoClass\" : \"%s\" "
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
dlqTopic.topicPath(),
PayloadMessages.SimpleMessage.class.getName());
List<PubsubMessage> messages =
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz"),
objectsProvider.messageIdName(ts(4), 9, "ba2"),
objectsProvider.messageIdName(ts(5), 10, "ba3"),
objectsProvider.messageIdName(ts(6), 13, "ba4"),
objectsProvider.messageIdName(ts(7), 15, "ba5"));
// We need the default options on the schema to include the project passed in for the
// integration test
CalciteConnection connection = connect(pipeline.getOptions(), new PubsubTableProvider());
Statement statement = connection.createStatement();
statement.execute(createTableString);
// Because Pubsub only allow new subscription receives message after the subscription is
// created, eventsTopic.publish(messages) can only be called after statement.executeQuery.
// However, because statement.executeQuery is a blocking call, it has to be put into a
// seperate thread to execute.
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<List<String>> queryResult =
pool.submit(
(Callable)
() -> {
ResultSet resultSet =
statement.executeQuery("SELECT message.payload.id FROM message LIMIT 3");
ImmutableList.Builder<String> result = ImmutableList.builder();
while (resultSet.next()) {
result.add(resultSet.getString(1));
}
return result.build();
});
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
eventsTopic.publish(messages);
assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));
pool.shutdown();
}
@Test
public void testSQLSelectsPayloadContentFlat() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "id INTEGER, \n"
+ "name VARCHAR \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ " \"protoClass\" : \"%s\", "
+ " \"timestampAttributeKey\" : \"ts\" "
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
PayloadMessages.SimpleMessage.class.getName());
String queryString = "SELECT message.id, message.name from message";
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
// Observe the query results and send success signal after seeing the expected messages
queryOutput.apply(
"waitForSuccess",
resultSignal.signalSuccessWhen(
SchemaCoder.of(PAYLOAD_SCHEMA),
observedRows ->
observedRows.equals(
ImmutableSet.of(
row(PAYLOAD_SCHEMA, 3, "foo"),
row(PAYLOAD_SCHEMA, 5, "bar"),
row(PAYLOAD_SCHEMA, 7, "baz")))));
// Start the pipeline
pipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// Start publishing the messages when main pipeline is started and signaling topic is ready
eventsTopic.publish(
ImmutableList.of(
objectsProvider.messageIdName(ts(1), 3, "foo"),
objectsProvider.messageIdName(ts(2), 5, "bar"),
objectsProvider.messageIdName(ts(3), 7, "baz")));
// Poll the signaling topic for success message
resultSignal.waitForSuccess(Duration.standardMinutes(5));
}
@Test
@SuppressWarnings("unchecked")
public void testSQLInsertRowsToPubsubFlat() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "name VARCHAR, \n"
+ "height INTEGER, \n"
+ "knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s"
+ " \"protoClass\" : \"%s\", "
+ " \"deadLetterQueue\" : \"%s\""
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
dlqTopic.topicPath());
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
// TODO(BEAM-8741): Ideally we could write this query without specifying a column list, because
// it shouldn't be possible to write to event_timestamp when it's mapped to publish time.
String queryString =
"INSERT INTO message (name, height, knows_javascript) \n"
+ "VALUES \n"
+ "('person1', 80, TRUE), \n"
+ "('person2', 70, FALSE)";
// Apply the PTransform to insert the rows
query(sqlEnv, pipeline, queryString);
pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
eventsTopic
.assertThatTopicEventuallyReceives(
objectsProvider.matcherNameHeightKnowsJS("person1", 80, true),
objectsProvider.matcherNameHeightKnowsJS("person2", 70, false))
.waitForUpTo(Duration.standardSeconds(40));
}
@Test
@SuppressWarnings("unchecked")
public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
String createTableString =
String.format(
"CREATE EXTERNAL TABLE message (\n"
+ " event_timestamp TIMESTAMP, \n"
+ " name VARCHAR, \n"
+ " height INTEGER, \n"
+ " knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "TBLPROPERTIES "
+ " '{ "
+ " %s "
+ " \"protoClass\" : \"%s\", "
+ " \"deadLetterQueue\" : \"%s\","
+ " \"timestampAttributeKey\" : \"ts\""
+ " }'",
tableProvider.getTableType(),
eventsTopic.topicPath(),
payloadFormatParam(),
PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
dlqTopic.topicPath());
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
String queryString =
"INSERT INTO message "
+ "VALUES "
+ "(TIMESTAMP '1970-01-01 00:00:00.001', 'person1', 80, TRUE), "
+ "(TIMESTAMP '1970-01-01 00:00:00.002', 'person2', 70, FALSE)";
query(sqlEnv, pipeline, queryString);
pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
eventsTopic
.assertThatTopicEventuallyReceives(
matcherTsNameHeightKnowsJS(ts(1), "person1", 80, true),
matcherTsNameHeightKnowsJS(ts(2), "person2", 70, false))
.waitForUpTo(Duration.standardSeconds(40));
}
@Test
@SuppressWarnings("unchecked")
public void testSQLReadAndWriteWithSameFlatTableDefinition() throws Exception {
// This test verifies that the same pubsub table definition can be used for both reading and
// writing
// pipeline: Use SQL to insert data into `people`
// filterPipeline: Use SQL to read from `people`, filter the rows, and write to
// `javascript_people`
String tblProperties =
objectsProvider.getPayloadFormat() == null
? ""
: String.format(
"TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
objectsProvider.getPayloadFormat());
String createTableString =
String.format(
"CREATE EXTERNAL TABLE people (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "name VARCHAR, \n"
+ "height INTEGER, \n"
+ "knows_javascript BOOLEAN \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "%s",
tableProvider.getTableType(), eventsTopic.topicPath(), tblProperties);
String filteredTblProperties =
objectsProvider.getPayloadFormat() == null
? ""
: String.format(
"TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
PayloadMessages.NameHeightMessage.class.getName(),
objectsProvider.getPayloadFormat());
String createFilteredTableString =
String.format(
"CREATE EXTERNAL TABLE javascript_people (\n"
+ "event_timestamp TIMESTAMP, \n"
+ "name VARCHAR, \n"
+ "height INTEGER \n"
+ ") \n"
+ "TYPE '%s' \n"
+ "LOCATION '%s' \n"
+ "%s",
tableProvider.getTableType(), filteredEventsTopic.topicPath(), filteredTblProperties);
// Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
sqlEnv.executeDdl(createTableString);
sqlEnv.executeDdl(createFilteredTableString);
// TODO(BEAM-8741): Ideally we could write these queries without specifying a column list,
// because
// it shouldn't be possible to write to event_timestamp when it's mapped to publish time.
String filterQueryString =
"INSERT INTO javascript_people (name, height) (\n"
+ " SELECT \n"
+ " name, \n"
+ " height \n"
+ " FROM people \n"
+ " WHERE knows_javascript \n"
+ ")";
String injectQueryString =
"INSERT INTO people (name, height, knows_javascript) VALUES \n"
+ "('person1', 80, TRUE), \n"
+ "('person2', 70, FALSE), \n"
+ "('person3', 60, TRUE), \n"
+ "('person4', 50, FALSE), \n"
+ "('person5', 40, TRUE)";
// Apply the PTransform to do the filtering
query(sqlEnv, filterPipeline, filterQueryString);
// Apply the PTransform to inject the input data
query(sqlEnv, pipeline, injectQueryString);
// Start the filter pipeline and wait until it has started.
filterPipeline.run();
// Block until a subscription for this topic exists
eventsTopic.assertSubscriptionEventuallyCreated(
pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
// .. then run the injector pipeline
pipeline.run().waitUntilFinish(Duration.standardMinutes(5));
filteredEventsTopic
.assertThatTopicEventuallyReceives(
objectsProvider.matcherNameHeight("person1", 80),
objectsProvider.matcherNameHeight("person3", 60),
objectsProvider.matcherNameHeight("person5", 40))
.waitForUpTo(Duration.standardMinutes(5));
}
@SuppressWarnings("unchecked")
private CalciteConnection connect(PipelineOptions options, TableProvider... tableProviders) {
// HACK: PipelineOptions should expose a prominent method to do this reliably
// The actual options are in the "options" field of the converted map
Map<String, String> argsMap =
((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
.entrySet().stream()
.filter(
(entry) -> {
if (entry.getValue() instanceof List) {
if (!((List) entry.getValue()).isEmpty()) {
throw new IllegalArgumentException("Cannot encode list arguments");
}
// We can encode empty lists, just omit them.
return false;
}
return true;
})
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
inMemoryMetaStore.registerProvider(tableProvider);
}
JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options);
connection.setPipelineOptionsMap(argsMap);
return connection;
}
private static String toArg(Object o) {
try {
String jsonRepr = MAPPER.writeValueAsString(o);
// String and enums are expected to be unquoted on the command line
if (jsonRepr.startsWith("\"") && jsonRepr.endsWith("\"")) {
return jsonRepr.substring(1, jsonRepr.length() - 1);
} else {
return jsonRepr;
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private String payloadFormatParam() {
return objectsProvider.getPayloadFormat() == null
? ""
: String.format("\"format\" : \"%s\", ", objectsProvider.getPayloadFormat());
}
private PCollection<Row> query(BeamSqlEnv sqlEnv, TestPipeline pipeline, String queryString) {
return BeamSqlRelUtils.toPCollection(pipeline, sqlEnv.parseQuery(queryString));
}
private Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}
private static PubsubMessage message(
Instant timestamp, byte[] payload, Map<String, String> attributes) {
return new PubsubMessage(
payload,
ImmutableMap.<String, String>builder()
.putAll(attributes)
.put("ts", String.valueOf(timestamp.getMillis()))
.build());
}
private Matcher<PubsubMessage> matcherTsNameHeightKnowsJS(
Instant ts, String name, int height, boolean knowsJS) throws Exception {
return allOf(
objectsProvider.matcherNameHeightKnowsJS(name, height, knowsJS),
hasProperty("attributeMap", hasEntry("ts", String.valueOf(ts.getMillis()))));
}
private Matcher<PubsubMessage> matcherPayload(Instant timestamp, String payload) {
return allOf(
hasProperty("payload", equalTo(payload.getBytes(StandardCharsets.US_ASCII))),
hasProperty("attributeMap", hasEntry("ts", String.valueOf(timestamp.getMillis()))));
}
private Instant ts(long millis) {
return Instant.ofEpochMilli(millis);
}
private PubsubMessage messagePayload(
Instant timestamp, String payload, Map<String, String> attributes) {
return message(timestamp, payload.getBytes(StandardCharsets.US_ASCII), attributes);
}
private abstract static class PubsubObjectProvider implements Serializable {
protected abstract String getPayloadFormat();
protected abstract PubsubMessage messageIdName(Instant timestamp, int id, String name)
throws Exception;
protected abstract Matcher<PubsubMessage> matcherNames(String name) throws Exception;
protected abstract Matcher<PubsubMessage> matcherNameHeightKnowsJS(
String name, int height, boolean knowsJS) throws Exception;
protected abstract Matcher<PubsubMessage> matcherNameHeight(String name, int height)
throws Exception;
}
private static class PubsubProtoObjectProvider extends PubsubObjectProvider {
@Override
protected String getPayloadFormat() {
return "proto";
}
@Override
protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
PayloadMessages.SimpleMessage.Builder simpleMessage =
PayloadMessages.SimpleMessage.newBuilder().setId(id).setName(name);
return PubsubTableProviderIT.message(
timestamp,
simpleMessage.build().toByteArray(),
ImmutableMap.of(name, Integer.toString(id)));
}
@Override
protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
PayloadMessages.NameMessage.Builder nameMessage =
PayloadMessages.NameMessage.newBuilder().setName(name);
return hasProperty("payload", equalTo(nameMessage.build().toByteArray()));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
String name, int height, boolean knowsJS) throws IOException {
PayloadMessages.NameHeightKnowsJSMessage.Builder nameHeightKnowsJSMessage =
PayloadMessages.NameHeightKnowsJSMessage.newBuilder()
.setHeight(height)
.setName(name)
.setKnowsJavascript(knowsJS);
return hasProperty("payload", equalTo(nameHeightKnowsJSMessage.build().toByteArray()));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
PayloadMessages.NameHeightMessage.Builder nameHeightMessage =
PayloadMessages.NameHeightMessage.newBuilder().setName(name).setHeight(height);
return hasProperty("payload", equalTo(nameHeightMessage.build().toByteArray()));
}
}
private static class PubsubJsonObjectProvider extends PubsubObjectProvider {
// Pubsub table provider should default to json
@Override
protected String getPayloadFormat() {
return null;
}
@Override
protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
String jsonString = "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
return message(timestamp, jsonString, ImmutableMap.of(name, Integer.toString(id)));
}
@Override
protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
return hasProperty("payload", toJsonByteLike(String.format("{\"name\":\"%s\"}", name)));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
String name, int height, boolean knowsJS) throws IOException {
String jsonString =
String.format(
"{\"name\":\"%s\", \"height\": %s, \"knows_javascript\": %s}", name, height, knowsJS);
return hasProperty("payload", toJsonByteLike(jsonString));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
String jsonString = String.format("{\"name\":\"%s\", \"height\": %s}", name, height);
return hasProperty("payload", toJsonByteLike(jsonString));
}
private PubsubMessage message(
Instant timestamp, String jsonPayload, Map<String, String> attributes) {
return PubsubTableProviderIT.message(timestamp, jsonPayload.getBytes(UTF_8), attributes);
}
private Matcher<byte[]> toJsonByteLike(String jsonString) throws IOException {
return jsonBytesLike(jsonString);
}
}
private static class PubsubAvroObjectProvider extends PubsubObjectProvider {
private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA =
Schema.builder()
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("height", Schema.FieldType.INT32)
.addNullableField("knows_javascript", Schema.FieldType.BOOLEAN)
.build();
private static final Schema NAME_HEIGHT_SCHEMA =
Schema.builder()
.addNullableField("name", Schema.FieldType.STRING)
.addNullableField("height", Schema.FieldType.INT32)
.build();
@Override
protected String getPayloadFormat() {
return "avro";
}
@Override
protected PubsubMessage messageIdName(Instant timestamp, int id, String name)
throws IOException {
byte[] encodedRecord =
createEncodedGenericRecord(
PAYLOAD_SCHEMA,
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(
id, name));
return message(timestamp, encodedRecord, ImmutableMap.of(name, Integer.toString(id)));
}
@Override
protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
Schema schema = Schema.builder().addStringField("name").build();
byte[] encodedRecord =
createEncodedGenericRecord(
schema,
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(
name));
return hasProperty("payload", equalTo(encodedRecord));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
byte[] encodedRecord =
createEncodedGenericRecord(
NAME_HEIGHT_SCHEMA,
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(
name, height));
return hasProperty("payload", equalTo(encodedRecord));
}
@Override
protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
String name, int height, boolean knowsJS) throws IOException {
byte[] encodedRecord =
createEncodedGenericRecord(
NAME_HEIGHT_KNOWS_JS_SCHEMA,
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(
name, height, knowsJS));
return hasProperty("payload", equalTo(encodedRecord));
}
private byte[] createEncodedGenericRecord(Schema beamSchema, List<Object> values)
throws IOException {
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecordBuilder builder = new GenericRecordBuilder(avroSchema);
List<org.apache.avro.Schema.Field> fields = avroSchema.getFields();
for (int i = 0; i < fields.size(); ++i) {
builder.set(fields.get(i), values.get(i));
}
AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
coder.encode(builder.build(), out);
return out.toByteArray();
}
}
}