Change kafka table provider properties structure. (#14507)
* Change kafka table provider properties structure.
This is an intentionally breaking change in the kafka beam SQL table. Currently, using the kafka table provider is impossible with a single reference identifier (LOCATION), and the location field goes entirely unused. This change repurposes the currently unused LOCATION field to be `<single bootstrap broker>/<topic name>`.
This is a breaking change because previously, users could have had LOCATION set to any string they wanted, including those without the <broker>/<topic name> structure, since this field was entirely unused.
This change also changes "bootstrap.servers" which is a comma-separated string in the properties to "bootstrap_servers" which uses a proper array, and makes both the "topics" and "bootstrap_servers" parameters optional (previously, they were actually required, although the documentation said otherwise).
Also update beam documentation to reflect new kafka, pubsub and pubsublite semantics added in this and previous PRs.
* Change kafka table provider properties structure.
This is an intentionally breaking change in the kafka beam SQL table. Currently, using the kafka table provider is impossible with a single reference identifier (LOCATION), and the location field goes entirely unused. This change repurposes the currently unused LOCATION field to be `<single bootstrap broker>/<topic name>`.
This is a breaking change because previously, users could have had LOCATION set to any string they wanted, including those without the <broker>/<topic name> structure, since this field was entirely unused.
This change also changes "bootstrap.servers" which is a comma-separated string in the properties to "bootstrap_servers" which uses a proper array, and makes both the "topics" and "bootstrap_servers" parameters optional (previously, they were actually required, although the documentation said otherwise).
Also update beam documentation to reflect new kafka, pubsub and pubsublite semantics added in this and previous PRs.
* fix: make LOCATION optional.
* fix: make LOCATION optional.
* Fix IT
* fix whitespace
* modify CHANGES.md
diff --git a/CHANGES.md b/CHANGES.md
index a94662f..489a9c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,8 @@
* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Python Row objects are now sensitive to field order. So `Row(x=3, y=4)` is no
longer considered equal to `Row(y=4, x=3)` (BEAM-11929).
+* Kafka Beam SQL tables now ascribe meaning to the LOCATION field; previously
+ it was ignored if provided.
## Deprecations
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 0c7396d..74f4dba 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -19,11 +19,10 @@
import static org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas.PAYLOAD_FIELD;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -33,6 +32,11 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Kafka table provider.
@@ -45,23 +49,62 @@
* NAME VARCHAR(127) COMMENT 'this is the name'
* )
* COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+ * TYPE kafka
+ * // Optional. One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ * "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ * "topics": ["topic2", "topic3"]
+ * }'
* }</pre>
*/
@AutoService(TableProvider.class)
public class KafkaTableProvider extends InMemoryMetaTableProvider {
+ private static class ParsedLocation {
+ String brokerLocation = "";
+ String topic = "";
+ }
+
+ private static ParsedLocation parseLocation(String location) {
+ ParsedLocation parsed = new ParsedLocation();
+ List<String> split = Splitter.on('/').splitToList(location);
+ checkArgument(
+ split.size() >= 2,
+ "Location string `%s` invalid: must be <broker bootstrap location>/<topic>.",
+ location);
+ parsed.topic = Iterables.getLast(split);
+ parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 1));
+ return parsed;
+ }
+
+ private static List<String> mergeParam(Optional<String> initial, @Nullable List<Object> toMerge) {
+ ImmutableList.Builder<String> merged = ImmutableList.builder();
+ initial.ifPresent(merged::add);
+ if (toMerge != null) {
+ toMerge.forEach(o -> merged.add(o.toString()));
+ }
+ return merged.build();
+ }
+
@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
Schema schema = table.getSchema();
-
JSONObject properties = table.getProperties();
- String bootstrapServers = properties.getString("bootstrap.servers");
- JSONArray topicsArr = properties.getJSONArray("topics");
- List<String> topics = new ArrayList<>(topicsArr.size());
- for (Object topic : topicsArr) {
- topics.add(topic.toString());
+
+ Optional<ParsedLocation> parsedLocation = Optional.empty();
+ if (!Strings.isNullOrEmpty(table.getLocation())) {
+ parsedLocation = Optional.of(parseLocation(checkArgumentNotNull(table.getLocation())));
}
+ List<String> topics =
+ mergeParam(parsedLocation.map(loc -> loc.topic), properties.getJSONArray("topics"));
+ List<String> allBootstrapServers =
+ mergeParam(
+ parsedLocation.map(loc -> loc.brokerLocation),
+ properties.getJSONArray("bootstrap_servers"));
+ String bootstrapServers = String.join(",", allBootstrapServers);
Optional<String> payloadFormat =
properties.containsKey("format")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
index 869c86b..fee04e4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
@@ -93,8 +93,8 @@
.name("kafka")
.type("kafka")
.schema(TEST_SCHEMA)
- .properties(
- JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"avro\" }"))
+ .location("localhost/mytopic")
+ .properties(JSON.parseObject("{ \"format\": \"avro\" }"))
.build()));
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
index 6f97c4e..d33665c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
@@ -66,8 +66,8 @@
.name("kafka")
.type("kafka")
.schema(TEST_SCHEMA)
- .properties(
- JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"json\" }"))
+ .location("localhost/mytopic")
+ .properties(JSON.parseObject("{ \"format\": \"json\" }"))
.build()));
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
index ba90cf9..a75dded 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
@@ -92,9 +92,10 @@
.name("kafka")
.type("kafka")
.schema(schema)
+ .location("localhost/mytopic")
.properties(
JSON.parseObject(
- "{ \"topics\": [ \"mytopic\" ], \"format\": \"proto\", \"protoClass\": \""
+ "{ \"format\": \"proto\", \"protoClass\": \""
+ PayloadMessages.TestMessage.class.getName()
+ "\" }"))
.build()));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
index 860bc34..958ca63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
@@ -98,9 +98,10 @@
.name("kafka")
.type("kafka")
.schema(schema)
+ .location("localhost/mytopic")
.properties(
JSON.parseObject(
- "{ \"topics\": [ \"mytopic\" ], \"format\": \"thrift\", \"thriftClass\": \""
+ "{ \"format\": \"thrift\", \"thriftClass\": \""
+ TestThriftMessage.class.getName()
+ "\", \"thriftProtocolFactoryClass\": \""
+ TCompactProtocol.Factory.class.getName()
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
index 0546c7a..ec94b65 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
@@ -19,6 +19,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
@@ -130,6 +131,13 @@
kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
kafkaOptions.setKafkaTopic(topic);
kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
+ checkArgument(
+ !KAFKA_CONTAINER.getBootstrapServers().contains(","),
+ "This integration test expects exactly one bootstrap server.");
+ }
+
+ private static String buildLocation() {
+ return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
}
@Test
@@ -139,7 +147,7 @@
Table.builder()
.name("kafka_table")
.comment("kafka table")
- .location("")
+ .location(buildLocation())
.schema(TEST_TABLE_SCHEMA)
.type("kafka")
.properties(JSON.parseObject(objectsProvider.getKafkaPropertiesString()))
@@ -165,9 +173,9 @@
+ "f_string VARCHAR NOT NULL \n"
+ ") \n"
+ "TYPE 'kafka' \n"
- + "LOCATION ''\n"
+ + "LOCATION '%s'\n"
+ "TBLPROPERTIES '%s'",
- objectsProvider.getKafkaPropertiesString());
+ buildLocation(), objectsProvider.getKafkaPropertiesString());
TableProvider tb = new KafkaTableProvider();
BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
@@ -213,9 +221,9 @@
+ ">"
+ ") \n"
+ "TYPE 'kafka' \n"
- + "LOCATION ''\n"
+ + "LOCATION '%s'\n"
+ "TBLPROPERTIES '%s'",
- objectsProvider.getKafkaPropertiesString());
+ buildLocation(), objectsProvider.getKafkaPropertiesString());
TableProvider tb = new KafkaTableProvider();
BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
@@ -365,11 +373,7 @@
protected String getKafkaPropertiesString() {
return "{ "
+ (getPayloadFormat() == null ? "" : "\"format\" : \"" + getPayloadFormat() + "\",")
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"] }";
+ + "}";
}
}
@@ -410,11 +414,6 @@
protected String getKafkaPropertiesString() {
return "{ "
+ "\"format\" : \"proto\","
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"],"
+ "\"protoClass\": \""
+ PayloadMessages.ItMessage.class.getName()
+ "\"}";
@@ -473,11 +472,6 @@
protected String getKafkaPropertiesString() {
return "{ "
+ "\"format\" : \"thrift\","
- + "\"bootstrap.servers\" : \""
- + kafkaOptions.getKafkaBootstrapServerAddress()
- + "\",\"topics\":[\""
- + kafkaOptions.getKafkaTopic()
- + "\"],"
+ "\"thriftClass\": \""
+ thriftClass.getName()
+ "\", \"thriftProtocolFactoryClass\": \""
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index e7154d9..a2a663a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -23,12 +23,13 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import java.util.List;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -38,6 +39,8 @@
/** UnitTest for {@link KafkaTableProvider}. */
public class KafkaTableProviderTest {
private final KafkaTableProvider provider = new KafkaTableProvider();
+ private static final String LOCATION_BROKER = "104.126.7.88:7743";
+ private static final String LOCATION_TOPIC = "topic1";
@Test
public void testBuildBeamSqlCSVTable() {
@@ -47,9 +50,37 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaCSVTable);
- BeamKafkaCSVTable csvTable = (BeamKafkaCSVTable) sqlTable;
- assertEquals("localhost:9092", csvTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+ }
+
+ @Test
+ public void testBuildWithExtraServers() {
+ Table table =
+ mockTableWithExtraServers("hello", ImmutableList.of("localhost:1111", "localhost:2222"));
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+ assertNotNull(sqlTable);
+ assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(
+ LOCATION_BROKER + ",localhost:1111,localhost:2222", kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+ }
+
+ @Test
+ public void testBuildWithExtraTopics() {
+ Table table = mockTableWithExtraTopics("hello", ImmutableList.of("topic2", "topic3"));
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+ assertNotNull(sqlTable);
+ assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+ BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), kafkaTable.getTopics());
}
@Test
@@ -60,9 +91,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable csvTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", csvTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -73,9 +104,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable protoTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", protoTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), protoTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -87,9 +118,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof BeamKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -100,9 +131,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -114,9 +145,9 @@
assertNotNull(sqlTable);
assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
- BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
- assertEquals("localhost:9092", thriftTable.getBootstrapServers());
- assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+ BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+ assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+ assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
}
@Test
@@ -125,48 +156,67 @@
}
private static Table mockTable(String name) {
- return mockTable(name, false, null, null, null, null);
+ return mockTable(name, false, null, null, null, null, null, null);
+ }
+
+ private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
+ return mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
+ }
+
+ private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
+ return mockTable(name, false, null, extraTopics, null, null, null, null);
}
private static Table mockTable(String name, String payloadFormat) {
- return mockTable(name, false, payloadFormat, null, null, null);
+ return mockTable(name, false, null, null, payloadFormat, null, null, null);
}
private static Table mockProtoTable(String name, Class<?> protoClass) {
- return mockTable(name, false, "proto", protoClass, null, null);
+ return mockTable(name, false, null, null, "proto", protoClass, null, null);
}
private static Table mockThriftTable(
String name,
Class<? extends TBase<?, ?>> thriftClass,
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
- return mockTable(name, false, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+ return mockTable(
+ name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
}
private static Table mockNestedBytesTable(String name) {
- return mockTable(name, true, null, null, null, null);
+ return mockTable(name, true, null, null, null, null, null, null);
}
private static Table mockNestedThriftTable(
String name,
Class<? extends TBase<?, ?>> thriftClass,
Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
- return mockTable(name, true, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+ return mockTable(
+ name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
}
private static Table mockTable(
String name,
boolean isNested,
+ @Nullable List<String> extraBootstrapServers,
+ @Nullable List<String> extraTopics,
@Nullable String payloadFormat,
@Nullable Class<?> protoClass,
@Nullable Class<? extends TBase<?, ?>> thriftClass,
@Nullable Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
JSONObject properties = new JSONObject();
- properties.put("bootstrap.servers", "localhost:9092");
- JSONArray topics = new JSONArray();
- topics.add("topic1");
- topics.add("topic2");
- properties.put("topics", topics);
+
+ if (extraBootstrapServers != null) {
+ JSONArray bootstrapServers = new JSONArray();
+ bootstrapServers.addAll(extraBootstrapServers);
+ properties.put("bootstrap_servers", bootstrapServers);
+ }
+ if (extraTopics != null) {
+ JSONArray topics = new JSONArray();
+ topics.addAll(extraTopics);
+ properties.put("topics", topics);
+ }
+
if (payloadFormat != null) {
properties.put("format", payloadFormat);
}
@@ -197,7 +247,7 @@
return Table.builder()
.name(name)
.comment(name + " table")
- .location("kafka://localhost:2181/brokers?topic=test")
+ .location(LOCATION_BROKER + "/" + LOCATION_TOPIC)
.schema(schema)
.type("kafka")
.properties(properties)
diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 59fedfd..f109726 100644
--- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -344,21 +344,27 @@
### Syntax
+#### Nested mode
```
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
event_timestamp TIMESTAMP,
- attributes MAP<VARCHAR, VARCHAR>,
- payload ROW<tableElement [, tableElement ]*>
+ attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
-TBLPROPERTIES '{
- "timestampAttributeKey": "key",
- "deadLetterQueue": "projects/[PROJECT]/topics/[TOPIC]",
- "format": "format"
-}'
```
+#### Flattened mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
+TYPE pubsub
+LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
+```
+
+In nested mode, the following fields hold topic metadata. The presence of the
+`attributes` field triggers nested mode usage.
+
* `event_timestamp`: The event timestamp associated with the Pub/Sub message
by PubsubIO. It can be one of the following:
* Message publish time, which is provided by Pub/Sub. This is the default
@@ -376,6 +382,8 @@
`deadLeaderQueue` field of the `tblProperties` blob. If no dead-letter queue
is specified in this case, an exception is thrown and the pipeline will
crash.
+
+
* `LOCATION`:
* `PROJECT`: ID of the Google Cloud Project
* `TOPIC`: The Pub/Sub topic name. A subscription will be created
@@ -390,15 +398,14 @@
payload was not parsed. If not specified, an exception is thrown for
parsing failures.
* `format`: Optional. Allows you to specify the Pubsub payload format.
- Possible values are {`json`, `avro`}. Defaults to `json`.
### Read Mode
-PubsubIO is currently limited to read access only.
+PubsubIO supports reading from topics by creating a new subscription.
### Write Mode
-Not supported. PubSubIO is currently limited to read access only in Beam SQL.
+PubsubIO supports writing to topics.
### Schema
@@ -411,13 +418,7 @@
### Supported Payload
-* JSON Objects (Default)
- * Beam only supports querying messages with payload containing JSON
- objects. Beam attempts to parse JSON to match the schema of the
- `payload` field.
-* Avro
- * An Avro schema is automatically generated from the specified schema of
- the `payload` field. It is used to parse incoming messages.
+* Pub/Sub supports [Generic Payload Handling](#generic-payload-handling).
### Example
@@ -427,38 +428,106 @@
LOCATION 'projects/testing-integration/topics/user-location'
```
+## Pub/Sub Lite
+
+### Syntax
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
+ publish_timestamp DATETIME,
+ event_timestamp DATETIME,
+ message_key BYTES,
+ attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE pubsublite
+// For writing
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
+// For reading
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
+```
+
+* `LOCATION`:
+ * `PROJECT`: ID of the Google Cloud Project
+ * `TOPIC`: The Pub/Sub Lite topic name.
+ * `SUBSCRIPTION`: The Pub/Sub Lite subscription name.
+ * `GCP-LOCATION`: The location for this Pub/Sub Lite topic os subscription.
+* `TBLPROPERTIES`:
+ * `timestampAttributeKey`: Optional. The key which contains the event
+ timestamp associated with the Pub/Sub message. If not specified, the
+ message publish timestamp is used as an event timestamp for
+ windowing/watermarking.
+ * `deadLetterQueue`: Optional, supports
+ [Generic DLQ Handling](#generic-dlq-handling)
+ * `format`: Optional. Allows you to specify the payload format.
+
+### Read Mode
+
+PubsubLiteIO supports reading from subscriptions.
+
+### Write Mode
+
+PubsubLiteIO supports writing to topics.
+
+### Supported Payload
+
+* Pub/Sub Lite supports [Generic Payload Handling](#generic-payload-handling).
+
+### Example
+
+```
+CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
+TYPE pubsublite
+LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
+```
+
## Kafka
KafkaIO is experimental in Beam SQL.
### Syntax
+#### Flattened mode
```
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
-LOCATION 'kafka://localhost:2181/brokers'
+LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
- "bootstrap.servers":"localhost:9092",
- "topics": ["topic1", "topic2"],
- "format": "avro"
- [, "protoClass": "com.example.ExampleMessage" ]
+ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+ "topics": ["topic2", "topic3"],
+ "format": "json"
}'
```
-* `LOCATION`: The Kafka topic URL.
+#### Nested mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
+ event_timestamp DATETIME,
+ message_key BYTES,
+ headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE kafka
+LOCATION 'my.company.url.com:2181/topic1'
+TBLPROPERTIES '{
+ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+ "topics": ["topic2", "topic3"],
+ "format": "json"
+}'
+```
+
+The presence of the `headers` field triggers nested mode usage.
+
+* `LOCATION`: A url with the initial bootstrap broker to use and the initial
+ topic name provided as the path.
* `TBLPROPERTIES`:
- * `bootstrap.servers`: Optional. Allows you to specify the bootstrap
- server.
- * `topics`: Optional. Allows you to specify specific topics.
+ * `bootstrap_servers`: Optional. Allows you to specify additional
+ bootstrap servers, which are used in addition to the one in `LOCATION`.
+ * `topics`: Optional. Allows you to specify additional topics, which are
+ used in addition to the one in `LOCATION`.
* `format`: Optional. Allows you to specify the Kafka values format. Possible values are
- {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv`.
- * `protoClass`: Optional. Use only when `format` is equal to `proto`. Allows you to
- specify full protocol buffer java class name.
- * `thriftClass`: Optional. Use only when `format` is equal to `thrift`. Allows you to
- specify full thrift java class name.
- * `thriftProtocolFactoryClass`: Optional. Use only when `format` is equal to `thrift`.
- Allows you to specify full class name of the `TProtocolFactory` to use for thrift
- serialization.
+ {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv` in
+ flattened mode or `json` in nested mode. `csv` does not support nested
+ mode.
### Read Mode
@@ -473,18 +542,8 @@
* CSV (default)
* Beam parses the messages, attempting to parse fields according to the
types specified in the schema.
-* Avro
- * An Avro schema is automatically generated from the specified field
- types. It is used to parse incoming messages and to format outgoing
- messages.
-* JSON Objects
- * Beam attempts to parse JSON to match the schema.
-* Protocol buffers
- * Fields in the schema have to match the fields of the given `protoClass`.
-* Thrift
- * Fields in the schema have to match the fields of the given `thriftClass`.
- * The `TProtocolFactory` used for thrift serialization must match the
- provided `thriftProtocolFactoryClass`.
+* Kafka supports all [Generic Payload Handling](#generic-payload-handling)
+ formats.
### Schema
@@ -582,3 +641,47 @@
TYPE text
LOCATION '/home/admin/orders'
```
+
+## Generic Payload Handling
+
+Certain data sources and sinks support generic payload handling. This handling
+parses a byte array payload field into a table schema. The following schemas are
+supported by this handling. All require at least setting `"format": "<type>"`,
+and may require other properties.
+
+* `avro`: Avro
+ * An Avro schema is automatically generated from the specified field
+ types. It is used to parse incoming messages and to format outgoing
+ messages.
+* `json`: JSON Objects
+ * Beam attempts to parse the byte array as UTF-8 JSON to match the schema.
+* `proto`: Protocol Buffers
+ * Beam locates the equivalent Protocol Buffer class and uses it to parse
+ the payload
+ * `protoClass`: Required. The proto class name to use. Must be built into
+ the deployed JAR.
+ * Fields in the schema have to match the fields of the given `protoClass`.
+* `thrift`: Thrift
+ * Fields in the schema have to match the fields of the given
+ `thriftClass`.
+ * `thriftClass`: Required. Allows you to specify full thrift java class
+ name. Must be built into the deployed JAR.
+ * `thriftProtocolFactoryClass`: Required. Allows you to specify full class
+ name of the `TProtocolFactory` to use for thrift serialization. Must be
+ built into the deployed JAR.
+ * The `TProtocolFactory` used for thrift serialization must match the
+ provided `thriftProtocolFactoryClass`.
+
+## Generic DLQ Handling
+
+Sources and sinks which support generic DLQ handling specify a parameter with
+the format `"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"`. The following types of
+DLQ handling are supported:
+
+* `bigquery`: BigQuery
+ * DLQ_ID is the table spec for an output table with an "error" string
+ field and "payload" byte array field.
+* `pubsub`: Pub/Sub Topic
+ * DLQ_ID is the full path of the Pub/Sub Topic.
+* `pubsublite`: Pub/Sub Lite Topic
+ * DLQ_ID is the full path of the Pub/Sub Lite Topic.
\ No newline at end of file