Change kafka table provider properties structure. (#14507)

* Change kafka table provider properties structure.

This is an intentionally breaking change in the kafka beam SQL table. Currently, using the kafka table provider is impossible with a single reference identifier (LOCATION), and the location field goes entirely unused. This change repurposes the currently unused LOCATION field to be `<single bootstrap broker>/<topic name>`.

This is a breaking change because previously, users could have had LOCATION set to any string they wanted, including those without the <broker>/<topic name> structure, since this field was entirely unused.

This change also changes "bootstrap.servers" which is a comma-separated string in the properties to "bootstrap_servers" which uses a proper array, and makes both the "topics" and "bootstrap_servers" parameters optional (previously, they were actually required, although the documentation said otherwise).

Also update beam documentation to reflect new kafka, pubsub and pubsublite semantics added in this and previous PRs.

* Change kafka table provider properties structure.

This is an intentionally breaking change in the kafka beam SQL table. Currently, using the kafka table provider is impossible with a single reference identifier (LOCATION), and the location field goes entirely unused. This change repurposes the currently unused LOCATION field to be `<single bootstrap broker>/<topic name>`.

This is a breaking change because previously, users could have had LOCATION set to any string they wanted, including those without the <broker>/<topic name> structure, since this field was entirely unused.

This change also changes "bootstrap.servers" which is a comma-separated string in the properties to "bootstrap_servers" which uses a proper array, and makes both the "topics" and "bootstrap_servers" parameters optional (previously, they were actually required, although the documentation said otherwise).

Also update beam documentation to reflect new kafka, pubsub and pubsublite semantics added in this and previous PRs.

* fix: make LOCATION optional.

* fix: make LOCATION optional.

* Fix IT

* fix whitespace

* modify CHANGES.md
diff --git a/CHANGES.md b/CHANGES.md
index a94662f..489a9c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,8 @@
 * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Python Row objects are now sensitive to field order. So `Row(x=3, y=4)` is no
   longer considered equal to `Row(y=4, x=3)` (BEAM-11929).
+* Kafka Beam SQL tables now ascribe meaning to the LOCATION field; previously
+  it was ignored if provided.
 
 ## Deprecations
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
index 0c7396d..74f4dba 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
@@ -19,11 +19,10 @@
 
 import static org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas.PAYLOAD_FIELD;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
-import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -33,6 +32,11 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Kafka table provider.
@@ -45,23 +49,62 @@
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+ * TYPE kafka
+ * // Optional. One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ *   "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ *   "topics": ["topic2", "topic3"]
+ * }'
  * }</pre>
  */
 @AutoService(TableProvider.class)
 public class KafkaTableProvider extends InMemoryMetaTableProvider {
+  private static class ParsedLocation {
+    String brokerLocation = "";
+    String topic = "";
+  }
+
+  private static ParsedLocation parseLocation(String location) {
+    ParsedLocation parsed = new ParsedLocation();
+    List<String> split = Splitter.on('/').splitToList(location);
+    checkArgument(
+        split.size() >= 2,
+        "Location string `%s` invalid: must be <broker bootstrap location>/<topic>.",
+        location);
+    parsed.topic = Iterables.getLast(split);
+    parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 1));
+    return parsed;
+  }
+
+  private static List<String> mergeParam(Optional<String> initial, @Nullable List<Object> toMerge) {
+    ImmutableList.Builder<String> merged = ImmutableList.builder();
+    initial.ifPresent(merged::add);
+    if (toMerge != null) {
+      toMerge.forEach(o -> merged.add(o.toString()));
+    }
+    return merged.build();
+  }
+
   @Override
   public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = table.getSchema();
-
     JSONObject properties = table.getProperties();
-    String bootstrapServers = properties.getString("bootstrap.servers");
-    JSONArray topicsArr = properties.getJSONArray("topics");
-    List<String> topics = new ArrayList<>(topicsArr.size());
-    for (Object topic : topicsArr) {
-      topics.add(topic.toString());
+
+    Optional<ParsedLocation> parsedLocation = Optional.empty();
+    if (!Strings.isNullOrEmpty(table.getLocation())) {
+      parsedLocation = Optional.of(parseLocation(checkArgumentNotNull(table.getLocation())));
     }
+    List<String> topics =
+        mergeParam(parsedLocation.map(loc -> loc.topic), properties.getJSONArray("topics"));
+    List<String> allBootstrapServers =
+        mergeParam(
+            parsedLocation.map(loc -> loc.brokerLocation),
+            properties.getJSONArray("bootstrap_servers"));
+    String bootstrapServers = String.join(",", allBootstrapServers);
 
     Optional<String> payloadFormat =
         properties.containsKey("format")
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
index 869c86b..fee04e4 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
@@ -93,8 +93,8 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(TEST_SCHEMA)
-                    .properties(
-                        JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"avro\" }"))
+                    .location("localhost/mytopic")
+                    .properties(JSON.parseObject("{ \"format\": \"avro\" }"))
                     .build()));
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
index 6f97c4e..d33665c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableJsonTest.java
@@ -66,8 +66,8 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(TEST_SCHEMA)
-                    .properties(
-                        JSON.parseObject("{ \"topics\": [ \"mytopic\" ], \"format\": \"json\" }"))
+                    .location("localhost/mytopic")
+                    .properties(JSON.parseObject("{ \"format\": \"json\" }"))
                     .build()));
   }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
index ba90cf9..a75dded 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
@@ -92,9 +92,10 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(schema)
+                    .location("localhost/mytopic")
                     .properties(
                         JSON.parseObject(
-                            "{ \"topics\": [ \"mytopic\" ], \"format\": \"proto\", \"protoClass\": \""
+                            "{ \"format\": \"proto\", \"protoClass\": \""
                                 + PayloadMessages.TestMessage.class.getName()
                                 + "\" }"))
                     .build()));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
index 860bc34..958ca63 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableThriftTest.java
@@ -98,9 +98,10 @@
                     .name("kafka")
                     .type("kafka")
                     .schema(schema)
+                    .location("localhost/mytopic")
                     .properties(
                         JSON.parseObject(
-                            "{ \"topics\": [ \"mytopic\" ], \"format\": \"thrift\", \"thriftClass\": \""
+                            "{ \"format\": \"thrift\", \"thriftClass\": \""
                                 + TestThriftMessage.class.getName()
                                 + "\", \"thriftProtocolFactoryClass\": \""
                                 + TCompactProtocol.Factory.class.getName()
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
index 0546c7a..ec94b65 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
@@ -19,6 +19,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.alibaba.fastjson.JSON;
 import java.io.Serializable;
@@ -130,6 +131,13 @@
     kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
     kafkaOptions.setKafkaTopic(topic);
     kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
+    checkArgument(
+        !KAFKA_CONTAINER.getBootstrapServers().contains(","),
+        "This integration test expects exactly one bootstrap server.");
+  }
+
+  private static String buildLocation() {
+    return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
   }
 
   @Test
@@ -139,7 +147,7 @@
         Table.builder()
             .name("kafka_table")
             .comment("kafka table")
-            .location("")
+            .location(buildLocation())
             .schema(TEST_TABLE_SCHEMA)
             .type("kafka")
             .properties(JSON.parseObject(objectsProvider.getKafkaPropertiesString()))
@@ -165,9 +173,9 @@
                 + "f_string VARCHAR NOT NULL \n"
                 + ") \n"
                 + "TYPE 'kafka' \n"
-                + "LOCATION ''\n"
+                + "LOCATION '%s'\n"
                 + "TBLPROPERTIES '%s'",
-            objectsProvider.getKafkaPropertiesString());
+            buildLocation(), objectsProvider.getKafkaPropertiesString());
     TableProvider tb = new KafkaTableProvider();
     BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
 
@@ -213,9 +221,9 @@
                 + ">"
                 + ") \n"
                 + "TYPE 'kafka' \n"
-                + "LOCATION ''\n"
+                + "LOCATION '%s'\n"
                 + "TBLPROPERTIES '%s'",
-            objectsProvider.getKafkaPropertiesString());
+            buildLocation(), objectsProvider.getKafkaPropertiesString());
     TableProvider tb = new KafkaTableProvider();
     BeamSqlEnv env = BeamSqlEnv.inMemory(tb);
 
@@ -365,11 +373,7 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + (getPayloadFormat() == null ? "" : "\"format\" : \"" + getPayloadFormat() + "\",")
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"] }";
+          + "}";
     }
   }
 
@@ -410,11 +414,6 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + "\"format\" : \"proto\","
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"],"
           + "\"protoClass\": \""
           + PayloadMessages.ItMessage.class.getName()
           + "\"}";
@@ -473,11 +472,6 @@
     protected String getKafkaPropertiesString() {
       return "{ "
           + "\"format\" : \"thrift\","
-          + "\"bootstrap.servers\" : \""
-          + kafkaOptions.getKafkaBootstrapServerAddress()
-          + "\",\"topics\":[\""
-          + kafkaOptions.getKafkaTopic()
-          + "\"],"
           + "\"thriftClass\": \""
           + thriftClass.getName()
           + "\", \"thriftProtocolFactoryClass\": \""
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
index e7154d9..a2a663a 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderTest.java
@@ -23,12 +23,13 @@
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import java.util.List;
 import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.thrift.payloads.SimpleThriftMessage;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.thrift.TBase;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -38,6 +39,8 @@
 /** UnitTest for {@link KafkaTableProvider}. */
 public class KafkaTableProviderTest {
   private final KafkaTableProvider provider = new KafkaTableProvider();
+  private static final String LOCATION_BROKER = "104.126.7.88:7743";
+  private static final String LOCATION_TOPIC = "topic1";
 
   @Test
   public void testBuildBeamSqlCSVTable() {
@@ -47,9 +50,37 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaCSVTable);
 
-    BeamKafkaCSVTable csvTable = (BeamKafkaCSVTable) sqlTable;
-    assertEquals("localhost:9092", csvTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+  }
+
+  @Test
+  public void testBuildWithExtraServers() {
+    Table table =
+        mockTableWithExtraServers("hello", ImmutableList.of("localhost:1111", "localhost:2222"));
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(
+        LOCATION_BROKER + ",localhost:1111,localhost:2222", kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
+  }
+
+  @Test
+  public void testBuildWithExtraTopics() {
+    Table table = mockTableWithExtraTopics("hello", ImmutableList.of("topic2", "topic3"));
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof BeamKafkaCSVTable);
+
+    BeamKafkaCSVTable kafkaTable = (BeamKafkaCSVTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC, "topic2", "topic3"), kafkaTable.getTopics());
   }
 
   @Test
@@ -60,9 +91,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable csvTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", csvTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), csvTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -73,9 +104,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable protoTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", protoTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), protoTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -87,9 +118,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof BeamKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -100,9 +131,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -114,9 +145,9 @@
     assertNotNull(sqlTable);
     assertTrue(sqlTable instanceof NestedPayloadKafkaTable);
 
-    BeamKafkaTable thriftTable = (BeamKafkaTable) sqlTable;
-    assertEquals("localhost:9092", thriftTable.getBootstrapServers());
-    assertEquals(ImmutableList.of("topic1", "topic2"), thriftTable.getTopics());
+    BeamKafkaTable kafkaTable = (BeamKafkaTable) sqlTable;
+    assertEquals(LOCATION_BROKER, kafkaTable.getBootstrapServers());
+    assertEquals(ImmutableList.of(LOCATION_TOPIC), kafkaTable.getTopics());
   }
 
   @Test
@@ -125,48 +156,67 @@
   }
 
   private static Table mockTable(String name) {
-    return mockTable(name, false, null, null, null, null);
+    return mockTable(name, false, null, null, null, null, null, null);
+  }
+
+  private static Table mockTableWithExtraServers(String name, List<String> extraBootstrapServers) {
+    return mockTable(name, false, extraBootstrapServers, null, null, null, null, null);
+  }
+
+  private static Table mockTableWithExtraTopics(String name, List<String> extraTopics) {
+    return mockTable(name, false, null, extraTopics, null, null, null, null);
   }
 
   private static Table mockTable(String name, String payloadFormat) {
-    return mockTable(name, false, payloadFormat, null, null, null);
+    return mockTable(name, false, null, null, payloadFormat, null, null, null);
   }
 
   private static Table mockProtoTable(String name, Class<?> protoClass) {
-    return mockTable(name, false, "proto", protoClass, null, null);
+    return mockTable(name, false, null, null, "proto", protoClass, null, null);
   }
 
   private static Table mockThriftTable(
       String name,
       Class<? extends TBase<?, ?>> thriftClass,
       Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
-    return mockTable(name, false, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+    return mockTable(
+        name, false, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
   }
 
   private static Table mockNestedBytesTable(String name) {
-    return mockTable(name, true, null, null, null, null);
+    return mockTable(name, true, null, null, null, null, null, null);
   }
 
   private static Table mockNestedThriftTable(
       String name,
       Class<? extends TBase<?, ?>> thriftClass,
       Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
-    return mockTable(name, true, "thrift", null, thriftClass, thriftProtocolFactoryClass);
+    return mockTable(
+        name, true, null, null, "thrift", null, thriftClass, thriftProtocolFactoryClass);
   }
 
   private static Table mockTable(
       String name,
       boolean isNested,
+      @Nullable List<String> extraBootstrapServers,
+      @Nullable List<String> extraTopics,
       @Nullable String payloadFormat,
       @Nullable Class<?> protoClass,
       @Nullable Class<? extends TBase<?, ?>> thriftClass,
       @Nullable Class<? extends TProtocolFactory> thriftProtocolFactoryClass) {
     JSONObject properties = new JSONObject();
-    properties.put("bootstrap.servers", "localhost:9092");
-    JSONArray topics = new JSONArray();
-    topics.add("topic1");
-    topics.add("topic2");
-    properties.put("topics", topics);
+
+    if (extraBootstrapServers != null) {
+      JSONArray bootstrapServers = new JSONArray();
+      bootstrapServers.addAll(extraBootstrapServers);
+      properties.put("bootstrap_servers", bootstrapServers);
+    }
+    if (extraTopics != null) {
+      JSONArray topics = new JSONArray();
+      topics.addAll(extraTopics);
+      properties.put("topics", topics);
+    }
+
     if (payloadFormat != null) {
       properties.put("format", payloadFormat);
     }
@@ -197,7 +247,7 @@
     return Table.builder()
         .name(name)
         .comment(name + " table")
-        .location("kafka://localhost:2181/brokers?topic=test")
+        .location(LOCATION_BROKER + "/" + LOCATION_TOPIC)
         .schema(schema)
         .type("kafka")
         .properties(properties)
diff --git a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 59fedfd..f109726 100644
--- a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -344,21 +344,27 @@
 
 ### Syntax
 
+#### Nested mode
 ```
 CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
     event_timestamp TIMESTAMP,
-    attributes MAP<VARCHAR, VARCHAR>,
-    payload ROW<tableElement [, tableElement ]*>
+    attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
+    payload [BYTES, ROW<tableElement [, tableElement ]*>]
 )
 TYPE pubsub
 LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
-TBLPROPERTIES '{
-    "timestampAttributeKey": "key",
-    "deadLetterQueue": "projects/[PROJECT]/topics/[TOPIC]",
-    "format": "format"
-}'
 ```
 
+#### Flattened mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
+TYPE pubsub
+LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
+```
+
+In nested mode, the following fields hold topic metadata. The presence of the
+`attributes` field triggers nested mode usage.
+
 *   `event_timestamp`: The event timestamp associated with the Pub/Sub message
     by PubsubIO. It can be one of the following:
     *   Message publish time, which is provided by Pub/Sub. This is the default
@@ -376,6 +382,8 @@
     `deadLeaderQueue` field of the `tblProperties` blob. If no dead-letter queue
     is specified in this case, an exception is thrown and the pipeline will
     crash.
+
+
 *   `LOCATION`:
     *   `PROJECT`: ID of the Google Cloud Project
     *   `TOPIC`: The Pub/Sub topic name. A subscription will be created
@@ -390,15 +398,14 @@
         payload was not parsed. If not specified, an exception is thrown for
         parsing failures.
     *   `format`: Optional. Allows you to specify the Pubsub payload format.
-        Possible values are {`json`, `avro`}. Defaults to `json`.
 
 ### Read Mode
 
-PubsubIO is currently limited to read access only.
+PubsubIO supports reading from topics by creating a new subscription.
 
 ### Write Mode
 
-Not supported. PubSubIO is currently limited to read access only in Beam SQL.
+PubsubIO supports writing to topics.
 
 ### Schema
 
@@ -411,13 +418,7 @@
 
 ### Supported Payload
 
-*   JSON Objects (Default)
-    *   Beam only supports querying messages with payload containing JSON
-        objects. Beam attempts to parse JSON to match the schema of the
-        `payload` field.
-*   Avro
-    *   An Avro schema is automatically generated from the specified schema of
-        the `payload` field. It is used to parse incoming messages.
+*   Pub/Sub supports [Generic Payload Handling](#generic-payload-handling).
 
 ### Example
 
@@ -427,38 +428,106 @@
 LOCATION 'projects/testing-integration/topics/user-location'
 ```
 
+## Pub/Sub Lite
+
+### Syntax
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
+    publish_timestamp DATETIME,
+    event_timestamp DATETIME,
+    message_key BYTES,
+    attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+    payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE pubsublite
+// For writing
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
+// For reading
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
+```
+
+*   `LOCATION`:
+    *   `PROJECT`: ID of the Google Cloud Project
+    *   `TOPIC`: The Pub/Sub Lite topic name.
+    *   `SUBSCRIPTION`: The Pub/Sub Lite subscription name.
+    *   `GCP-LOCATION`: The location for this Pub/Sub Lite topic os subscription.
+*   `TBLPROPERTIES`:
+    *   `timestampAttributeKey`: Optional. The key which contains the event
+        timestamp associated with the Pub/Sub message. If not specified, the
+        message publish timestamp is used as an event timestamp for
+        windowing/watermarking.
+    *   `deadLetterQueue`: Optional, supports
+        [Generic DLQ Handling](#generic-dlq-handling)
+    *   `format`: Optional. Allows you to specify the payload format.
+
+### Read Mode
+
+PubsubLiteIO supports reading from subscriptions.
+
+### Write Mode
+
+PubsubLiteIO supports writing to topics.
+
+### Supported Payload
+
+*   Pub/Sub Lite supports [Generic Payload Handling](#generic-payload-handling).
+
+### Example
+
+```
+CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
+TYPE pubsublite
+LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
+```
+
 ## Kafka
 
 KafkaIO is experimental in Beam SQL.
 
 ### Syntax
 
+#### Flattened mode
 ```
 CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
 TYPE kafka
-LOCATION 'kafka://localhost:2181/brokers'
+LOCATION 'my.company.url.com:2181/topic1'
 TBLPROPERTIES '{
-    "bootstrap.servers":"localhost:9092",
-    "topics": ["topic1", "topic2"],
-    "format": "avro"
-    [, "protoClass": "com.example.ExampleMessage" ]
+    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+    "topics": ["topic2", "topic3"],
+    "format": "json"
 }'
 ```
 
-*   `LOCATION`: The Kafka topic URL.
+#### Nested mode
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
+  event_timestamp DATETIME,
+  message_key BYTES,
+  headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+  payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE kafka
+LOCATION 'my.company.url.com:2181/topic1'
+TBLPROPERTIES '{
+    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
+    "topics": ["topic2", "topic3"],
+    "format": "json"
+}'
+```
+
+The presence of the `headers` field triggers nested mode usage.
+
+*   `LOCATION`: A url with the initial bootstrap broker to use and the initial
+    topic name provided as the path.
 *   `TBLPROPERTIES`:
-    *   `bootstrap.servers`: Optional. Allows you to specify the bootstrap
-        server.
-    *   `topics`: Optional. Allows you to specify specific topics.
+    *   `bootstrap_servers`: Optional. Allows you to specify additional
+        bootstrap servers, which are used in addition to the one in `LOCATION`.
+    *   `topics`: Optional. Allows you to specify additional topics, which are
+        used in addition to the one in `LOCATION`.
     *   `format`: Optional. Allows you to specify the Kafka values format. Possible values are
-        {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv`.
-    *   `protoClass`: Optional. Use only when `format` is equal to `proto`. Allows you to
-        specify full protocol buffer java class name.
-    *   `thriftClass`: Optional. Use only when `format` is equal to `thrift`. Allows you to
-        specify full thrift java class name.
-    *   `thriftProtocolFactoryClass`: Optional. Use only when `format` is equal to `thrift`.
-        Allows you to specify full class name of the `TProtocolFactory` to use for thrift
-        serialization.
+        {`csv`, `avro`, `json`, `proto`, `thrift`}. Defaults to `csv` in
+        flattened mode or `json` in nested mode. `csv` does not support nested
+        mode.
 
 ### Read Mode
 
@@ -473,18 +542,8 @@
 *   CSV (default)
     *   Beam parses the messages, attempting to parse fields according to the
         types specified in the schema.
-*   Avro
-    *   An Avro schema is automatically generated from the specified field
-        types. It is used to parse incoming messages and to format outgoing
-        messages.
-*   JSON Objects
-    *   Beam attempts to parse JSON to match the schema.
-*   Protocol buffers
-    *   Fields in the schema have to match the fields of the given `protoClass`.
-*   Thrift
-    *   Fields in the schema have to match the fields of the given `thriftClass`.
-    *   The `TProtocolFactory` used for thrift serialization must match the
-        provided `thriftProtocolFactoryClass`.
+*   Kafka supports all [Generic Payload Handling](#generic-payload-handling)
+    formats.
 
 ### Schema
 
@@ -582,3 +641,47 @@
 TYPE text
 LOCATION '/home/admin/orders'
 ```
+
+## Generic Payload Handling
+
+Certain data sources and sinks support generic payload handling. This handling
+parses a byte array payload field into a table schema. The following schemas are
+supported by this handling. All require at least setting `"format": "<type>"`,
+and may require other properties.
+
+*   `avro`: Avro
+    *   An Avro schema is automatically generated from the specified field
+        types. It is used to parse incoming messages and to format outgoing
+        messages.
+*   `json`: JSON Objects
+    *   Beam attempts to parse the byte array as UTF-8 JSON to match the schema.
+*   `proto`: Protocol Buffers
+    *   Beam locates the equivalent Protocol Buffer class and uses it to parse
+        the payload
+    *   `protoClass`: Required. The proto class name to use. Must be built into
+         the deployed JAR.
+    *   Fields in the schema have to match the fields of the given `protoClass`.
+*   `thrift`: Thrift
+    *   Fields in the schema have to match the fields of the given
+        `thriftClass`.
+    *   `thriftClass`: Required. Allows you to specify full thrift java class
+        name. Must be built into the deployed JAR.
+    *   `thriftProtocolFactoryClass`: Required. Allows you to specify full class
+        name of the `TProtocolFactory` to use for thrift serialization. Must be
+        built into the deployed JAR.
+    *   The `TProtocolFactory` used for thrift serialization must match the
+        provided `thriftProtocolFactoryClass`.
+
+## Generic DLQ Handling
+
+Sources and sinks which support generic DLQ handling specify a parameter with
+the format `"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"`. The following types of
+DLQ handling are supported:
+
+*   `bigquery`: BigQuery
+    *   DLQ_ID is the table spec for an output table with an "error" string
+        field and "payload" byte array field.
+*   `pubsub`: Pub/Sub Topic
+    *   DLQ_ID is the full path of the Pub/Sub Topic.
+*   `pubsublite`: Pub/Sub Lite Topic
+    *   DLQ_ID is the full path of the Pub/Sub Lite Topic.
\ No newline at end of file