[FLINK-23864][docs] Add pulsar connector document (Chinese & English).
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
new file mode 100644
index 0000000..da6380a
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -0,0 +1,356 @@
+---
+title: Pulsar
+weight: 9
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Pulsar 连接器
+
+Flink 当前只提供 [Apache Pulsar](https://pulsar.apache.org) 数据源,用户可以使用它从 Pulsar 读取数据,并保证每条数据只被处理一次。
+
+## 添加依赖
+
+连接器当前支持 Pulsar 2.7.0 之后的版本,但是连接器使用到了 Pulsar 的[事务机制](https://pulsar.apache.org/docs/en/txn-what/),建议在 Pulsar 2.8.0
+及其之后的版本上使用连接器进行数据读取。
+
+如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
+
+{{< artifact flink-connector-pulsar withScalaVersion >}}
+
+使用本连接器的同时,记得把 `flink-connector-base` 也加到项目的依赖里面:
+
+{{< artifact flink-connector-base >}}
+
+Flink 的流连接器并不会放到发行文件里面一同发布,阅读[此文档]({{< ref "docs/dev/datastream/project-configuration" >}}),了解如何将连接器添加到集群实例内。
+
+## Pulsar 数据源
+
+{{< hint info >}}
+Pulsar 数据源基于 Flink 最新的[批流一体 API]({{< ref "docs/dev/datastream/sources.md" >}}) 进行开发。
+
+如果要想使用基于旧版的 `SourceFunction` 实现的 Pulsar 数据源,或者是项目的 Flink 版本低于 1.14,可以使用 StreamNative 单独维护的 [pulsar-flink](https://github.com/streamnative/pulsar-flink)。
+{{< /hint >}}
+
+### 使用示例
+
+Pulsar 数据源提供了 builder 类来构造数据源实例。下面的代码实例使用 builder 类创建的数据源会从 topic "persistent://public/default/my-topic" 的数据开始端进行消费。
+连接器使用了 **Exclusive**(独占)的订阅方式消费消息,订阅名称为 `my-subscription`,并把消息体的二进制字节流以 UTF-8 的方式编码为字符串。
+
+```java
+PulsarSource<String> pulsarSource = PulsarSource.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setStartCursor(StartCursor.earliest())
+    .setTopics("my-topic")
+    .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setSubscriptionName("my-subscription")
+    .setSubscriptionType(SubscriptionType.Exclusive)
+    .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
+```
+
+如果使用构造类构造 Pulsar 数据源,一定要提供下面几个属性:
+
+- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供
+- Pulsar HTTP 管理地址,使用 `setAdminUrl(String)` 方法提供
+- Pulsar 订阅名称,使用 `setSubscriptionName(String)` 方法提供
+- 需要消费的 topic 或者是 topic 下面的分区,详见[指定消费的 Topic 或者 Topic 分区](#指定消费的-topic-或者-topic-分区)
+- 解码 Pulsar 消息的反序列化器,详见[反序列化器](#反序列化器)
+
+### 指定消费的 Topic 或者 Topic 分区
+
+Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
+
+- Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
+  ```java
+  PulsarSource.builder().setTopics("some-topic1", "some-topic2")
+
+  // 从 topic "topic-a" 的 0 和 1 分区上消费
+  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+  ```
+
+- Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:
+  ```java
+  PulsarSource.builder().setTopicPattern("topic-*")
+  ```
+
+#### Topic 名称简写
+
+从 Pulsar 2.0 之后,完整的 topic 名称格式为 `{persistent|non-persistent}://租户/命名空间/topic`。
+但是连接器不需要提供 topic 名称的完整定义,因为 topic 类型、租户、命名空间都设置了默认值。
+
+Topic 属性 | 默认值
+:------------|:-------
+topic 类型 | `persistent`
+租户 | `public`
+命名空间 | `default`
+
+下面的表格提供了当前 Pulsar 支持的简写方式:
+
+topic 名称简写 | 翻译后的 topic 名称
+:----------------|:---------------------
+`my-topic` | `persistent://public/default/my-topic`
+`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+
+{{< hint warning >}}
+对于 non-persistent(非持久化) topic,连接器不支持简写名称。所以无法将 `non-persistent://public/default/my-topic` 简写成 `non-persistent://my-topic`。
+{{< /hint >}}
+
+#### Pulsar Topic 层次结构
+
+对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在内部按照分区的大小拆分成等量的无分区 Topic。
+
+例如,在 Pulsar 的 `sample` 租户下面的 `flink` 命名空间里面创建了一个有 3 个分区的 topic,给它起名为 `simple-string`。
+可以在 Pulsar 上看到如下的 topic 列表:
+
+Topic 名称 | 是否分区
+:--------- | :----------
+`persistent://sample/flink/simple-string` | 是
+`persistent://sample/flink/simple-string-partition-0` | 否
+`persistent://sample/flink/simple-string-partition-1` | 否
+`persistent://sample/flink/simple-string-partition-2` | 否
+
+这意味着,用户可以用上面的子 topic 去直接消费分区里面的数据,不需要再去基于上层的父 topic 去消费全部分区的数据。
+例如:使用 `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` 将会只消费 topic `sample/flink/simple-string` 上面的分区 1 和 2 里面的消息。
+
+#### 配置 Topic 正则表达式
+
+前面提到了 Pulsar topic 有 `persistent`、`non-persistent` 两种类型,使用正则表达式消费数据的时候,连接器会尝试从正则表达式里面解析出消息的类型。
+例如:`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` 会解析出 `non-persistent` 这个 topic 类型。
+如果用户使用 topic 名称简写的方式,连接器会使用默认的消息类型 `persistent`。
+
+如果想用正则去消费 `persistent` 和 `non-persistent` 类型的 topic,需要使用 `RegexSubscriptionMode` 定义 topic 类型,例如:`setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`。
+
+### 反序列化器
+
+反序列化器用于解析 Pulsar 消息,连接器使用 `PulsarDeserializationSchema` 来定义反序列化器。
+用户可以在 builder 类中使用 `setDeserializationSchema(PulsarDeserializationSchema)` 方法配置反序列化器,它会解析 Pulsar 的 `Message<byte[]>` 实例。
+
+如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 `PulsarDeserializationSchema`。Pulsar 连接器里面提供了 3 种预定义好的反序列化器。
+
+- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/en/schema-understand/) 解析消息。
+  ```java
+  // 基础数据类型
+  PulsarDeserializationSchema.pulsarSchema(Schema)
+
+  // 结构类型 (JSON, Protobuf, Avro, etc.)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class)
+
+  // 键值对类型
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- 使用 Flink 的 `DeserializationSchema` 解析消息。
+  ```java
+  PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
+  ```
+- 使用 Flink 的 `TypeInformation` 解析消息。
+  ```java
+  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
+  ```
+
+Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。
+例如,消息的 key,消息发送时间,消息生产时间,用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
+
+如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。
+并一定要确保 `PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 是正确的结果。
+Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
+
+### Pulsar 订阅
+
+订阅是命名好的配置规则,指导消息如何投递给消费者。连接器需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
+
+- [exclusive(独占)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#exclusive)
+- [shared(共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#shared)
+- [failover(灾备)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#failover)
+- [key_shared(key 共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#key_shared)
+
+当前 Pulsar 连接器里面,`独占` 和 `灾备` 的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未消费的数据交给其他的 reader 来消费数据。
+
+默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(`SubscriptionType.Shared`)。
+
+```java
+// 名为 "my-shared" 的共享订阅
+PulsarSource.builder().setSubscriptionName("my-shared")
+
+// 名为 "my-exclusive" 的独占订阅
+PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
+```
+
+如果想在 Pulsar 连接器里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。
+
+Pulsar 连接器也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
+
+### 起始消费位置
+
+连接器使用 `setStartCursor(StartCursor)` 方法给定开始消费的位置。内置的消费位置有:
+
+- 从 topic 里面最早的一条消息开始消费。
+  ```java
+  StartCursor.earliest()
+  ```
+- 从 topic 里面最新的一条消息开始消费。
+  ```java
+  StartCursor.latest()
+  ```
+- 从给定的消息开始消费。
+  ```java
+  StartCursor.fromMessageId(MessageId)
+  ```
+- 与前者不同的是,给定的消息可以跳过,再进行消费。
+  ```java
+  StartCursor.fromMessageId(MessageId, boolean)
+  ```
+- 从给定的消息时间开始消费。
+  ```java
+  StartCursor.fromMessageTime(long)
+  ```
+
+{{< hint info >}}
+每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
+Pulsar 称这个序列号为 `MessageId`,用户可以使用 `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)` 创建它。
+{{< /hint >}}
+
+### 边界
+
+Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连接器使用流的方式消费数据。除非任务失败或者被取消,否则连接器将持续消费数据。
+用户可以使用 `setBoundedStopCursor(StopCursor)` 给定停止消费的位置,这种情况下连接器会使用批的方式进行消费。当所有 topic 分区都消费到了停止位置,Flink 任务就会结束。
+
+使用流的方式一样可以给定停止位置,使用 `setUnboundedStopCursor(StopCursor)` 方法即可。
+
+内置的停止位置如下:
+
+- 永不停止。
+  ```java
+  StopCursor.never()
+  ```
+- 停止于 Pulsar 启动时 topic 里面最新的那条数据。
+  ```java
+  StopCursor.latest()
+  ```
+- 停止于某条消息,结果里不包含此消息。
+  ```java
+  StopCursor.atMessageId(MessageId)
+  ```
+- 停止于某条消息之后,结果里包含此消息。
+  ```java
+  StopCursor.afterMessageId(MessageId)
+  ```
+- 停止于某个给定的消息时间戳。
+  ```java
+  StopCursor.atEventTime(long)
+  ```
+
+### 其他配置项
+
+除了前面提到的配置选项,连接器还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 `setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 方法给定下述的全部配置。
+
+#### Pulsar Java 客户端配置项
+
+Pulsar 连接器使用[Java 客户端](https://pulsar.apache.org/docs/en/client-libraries-java/)来创建消费实例,相关的配置定义于 Pulsar 的 `ClientConfigurationData` 内。连接器在 `PulsarOptions` 选项中,定义大部分的可供用户定义的配置。
+
+{{< generated/pulsar_client_configuration >}}
+
+#### Pulsar 管理 API 配置项
+
+[管理 API](https://pulsar.apache.org/docs/en/admin-api-overview/) 用于查询 topic 的元数据和用正则订阅的时候的 topic 查找,它与
+Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,连接器也在 `PulsarOptions` 里予以定义。
+
+{{< generated/pulsar_admin_configuration >}}
+
+#### Pulsar 消费者 API 配置项
+
+Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。
+Flink 上的 Pulsar 连接器使用消费者 API 进行消费,它的配置定义于 Pulsar 的 `ConsumerConfigurationData` 内。连接器将其中大部分的可供用户定义的配置定义于 `PulsarSourceOptions` 内。
+
+{{< generated/pulsar_consumer_configuration >}}
+
+#### Pulsar 数据源配置项
+
+下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用强制配置。
+
+{{< generated/pulsar_source_configuration >}}
+
+### 动态分区发现
+
+为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。
+对选项 `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` 设置一个正整数即可启用。
+
+```java
+// 10 秒查询一次分区信息
+PulsarSource.builder()
+        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
+```
+
+{{< hint warning >}}
+默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。
+{{< /hint >}}
+
+### 事件时间和水位线
+
+默认情况下,连接器使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用 `WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
+
+```java
+env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
+```
+
+[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) 详细讲解了如何定义 `WatermarkStrategy`。
+
+### 消息确认
+
+一旦在 topic 上创建了订阅,消息便会[存储](https://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/#%E6%8C%81%E4%B9%85%E5%8C%96%E5%AD%98%E5%82%A8)在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当连接器同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。连接器支持四种订阅方式,它们的消息确认方式也大不相同。
+
+#### 独占和灾备订阅下的消息确认
+
+`独占` 和 `灾备` 订阅下,连接器使用累进式确认方式。确认某条消息已经被处理时,其前面被消费的消息会自动被置为已读。Pulsar 连接器会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
+
+如果用户没有在 Flink 上启用检查点,连接器可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 `PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` 来进行定义。
+
+需要注意的是,此种场景下,Pulsar 连接器并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。
+
+#### 共享和 key 共享订阅下的消息确认
+
+`共享` 和 `key 共享` 需要依次确认每一条消息,所以连接器在 Pulsar 事务里面进行消息确认,然后将事务提交到 Pulsar。
+
+首先需要在 Pulsar 的 `borker.conf` 文件里面启用事务:
+
+```text
+transactionCoordinatorEnabled=true
+```
+
+连接器创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` 来设置事务的超时时间。
+
+如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` 选项设置为 `true`,消息从 Pulsar 消费后会被立刻置为已读。连接器无法保证此种场景下的消息一致性。
+
+连接器在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。
+
+## 升级至最新的连接器
+
+常见的升级步骤,请参阅[升级应用程序和 Flink 版本]({{< ref "docs/ops/upgrading" >}})。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:
+
+* 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
+* 使用最新版本的 Pulsar 客户端来消费消息。
+
+## 问题诊断
+
+使用 Flink 和 Pulsar 交互时如果遇到问题,一定要牢记 Flink 只使用了 Pulsar 的[Java 客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/) 和[管理 API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/)。用户遇到的问题很有可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本、或者修改 Pulsar 的配置,Pulsar 连接器的配置来尝试解决问题。
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
new file mode 100644
index 0000000..511b39a
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -0,0 +1,420 @@
+---
+title: Pulsar
+weight: 9
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Pulsar Connector
+
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector for reading data from Pulsar topics with exactly-once guarantees.
+
+## Dependency
+
+You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar source connector supports
+Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/),
+it is recommended to use Pulsar 2.8.0 or higher releases.
+For details on Pulsar compatibility, please refer to the [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+
+{{< artifact flink-connector-pulsar withScalaVersion >}}
+
+If you are using Pulsar source, `flink-connector-base` is also required as dependency:
+
+{{< artifact flink-connector-base >}}
+
+Flink's streaming connectors are not currently part of the binary distribution.
+See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}).
+
+## Pulsar Source
+
+{{< hint info >}}
+This part describes the Pulsar source based on the new
+[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower releases, just use the StreamNative's [pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar source provides a builder class for constructing an instance of PulsarSource. The code snippet below shows
+how to build a PulsarSource to consume messages from the earliest cursor of topic "persistent://public/default/my-topic",
+with **Exclusive** subscription `my-subscription` and deserialize the raw payload of the messages as strings.
+
+```java
+PulsarSource<String> pulsarSource = PulsarSource.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setStartCursor(StartCursor.earliest())
+    .setTopics("my-topic")
+    .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
+    .setSubscriptionName("my-subscription")
+    .setSubscriptionType(SubscriptionType.Exclusive)
+    .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
+```
+
+The following properties are **required** for building a PulsarSource:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Pulsar subscription name, configured by `setSubscriptionName(String)`
+- Topics / partitions to subscribe, see the following
+  [Topic-partition subscription](#topic-partition-subscription) for more details.
+- Deserializer to parse Pulsar messages, see the following
+  [Deserializer](#deserializer) for more details.
+
+### Topic-partition Subscription
+
+Pulsar source provide two ways of topic-partition subscription:
+
+- Topic list, subscribing messages from all partitions in a list of topics. For example:
+  ```java
+  PulsarSource.builder().setTopics("some-topic1", "some-topic2")
+
+  // Partition 0 and 2 of topic "topic-a"
+  PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+  ```
+
+- Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:
+  ```java
+  PulsarSource.builder().setTopicPattern("topic-*")
+  ```
+
+#### Flexible Topic Naming
+
+Since Pulsar 2.0, all topic names internally have the form `{persistent|non-persistent}://tenant/namespace/topic`.
+Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity).
+The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.
+
+Topic property | Default
+:------------|:-------
+topic type | `persistent`
+tenant | `public`
+namespace | `default`
+
+This table lists a mapping relationship between your input topic name and translated topic name:
+
+Input topic name | Translated topic name
+:----------------|:---------------------
+`my-topic` | `persistent://public/default/my-topic`
+`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+
+{{< hint warning >}}
+For non-persistent topics, you need to continue to specify the entire topic name,
+as the default-based rules do not apply for non-partitioned topics.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead.
+{{< /hint >}}
+
+#### Subscribing Pulsar Topic Partition
+
+Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.
+
+For example, if a `simple-string` topic with 3 partitions is created under the `sample` tenant with `flink` namespace.
+The topics on Pulsar would be:
+
+Topic name | Partitioned
+:--------- | :----------
+`persistent://sample/flink/simple-string` | Y
+`persistent://sample/flink/simple-string-partition-0` | N
+`persistent://sample/flink/simple-string-partition-1` | N
+`persistent://sample/flink/simple-string-partition-2` | N
+
+You can directly consume messages from the topic partitions by using the non-partitioned topic names above.
+For example, use `PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
+
+#### RegexSubscriptionMode for Topic Pattern
+
+Pulsar connector extracts the topic type (`persistent` or `non-persistent`) from the given topic pattern.
+For example, `PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be `non-persistent`.
+The topic type would be `persistent` if you do not provide the topic type in the regular expression.
+
+To consume both `persistent` and `non-persistent` topics based on the topic pattern,
+you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
+Pulsar connector would filter the available topics by the `RegexSubscriptionMode`.
+
+### Deserializer
+
+A deserializer (Deserialization schema) is required for parsing Pulsar messages. The deserializer is
+configured by `setDeserializationSchema(PulsarDeserializationSchema)`.
+The `PulsarDeserializationSchema` defines how to deserialize a Pulsar `Message<byte[]>`.
+
+If only the raw payload of a message (message data in bytes) is needed,
+you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provides three types of implementation.
+
+- Decode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarDeserializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Decode the message by using Flink's `DeserializationSchema`
+  ```java
+  PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
+  ```
+- Decode the message by using Flink's `TypeInformation`
+  ```java
+  PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
+  ```
+
+Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
+such as message key, message publish time, message time, application defined key/value pairs that will be attached to the message, etc.
+These properties could be acquired by the `Message<byte[]>` interface.
+
+If you want to deserialize the Pulsar message by these properties, you need to implement `PulsarDeserializationSchema`.
+And ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` must be correct.
+Flink would use this `TypeInformation` for passing the messages to downstream operators.
+
+### Pulsar Subscriptions
+
+A Pulsar subscription is a named configuration rule that determines how messages are delivered to Flink readers.
+The subscription name is required for consuming messages. Pulsar connector supports four subscription types:
+
+- [Exclusive](https://pulsar.apache.org/docs/en/concepts-messaging/#exclusive)
+- [Shared](https://pulsar.apache.org/docs/en/concepts-messaging/#shared)
+- [Failover](https://pulsar.apache.org/docs/en/concepts-messaging/#failover)
+- [Key_Shared](https://pulsar.apache.org/docs/en/concepts-messaging/#key_shared)
+
+There is no difference between `Exclusive` and `Failover` in the Pulsar connector.
+When a Flink reader crashes, all (non-acknowledged and subsequent) messages are redelivered to the available Flink readers.
+
+By default, if no subscription type is defined, Pulsar source uses `Shared` subscription.
+
+```java
+// Shared subscription with name "my-shared"
+PulsarSource.builder().setSubscriptionName("my-shared")
+
+// Exclusive subscription with name "my-exclusive"
+PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
+```
+
+If you want to use `Key_Shared` subscription type on the Pulsar connector. Ensure that you provide a `RangeGenerator` implementation.
+The `RangeGenerator` generates a set of key hash ranges so that
+a respective reader subtask will only dispatch messages where the hash of the message key is contained in the specified range.
+
+Pulsar connector would use a `UniformRangeGenerator` which would divides the range by the Flink source parallelism
+if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+
+### Starting Position
+
+Pulsar source is able to consume messages starting from different positions by `setStartCursor(StartCursor)`.
+Built-in start cursors include:
+
+- Start from the earliest available message in the topic.
+  ```java
+  StartCursor.earliest()
+  ```
+- Start from the latest available message in the topic.
+  ```java
+  StartCursor.latest()
+  ```
+- Start from a specified message between the earliest and the latest.
+  Pulsar connector would consume from the latest available message if the message id doesn't exist.
+
+  The start message is included in consuming result.
+  ```java
+  StartCursor.fromMessageId(MessageId)
+  ```
+- Start from a specified message between the earliest and the latest.
+  Pulsar connector would consume from the latest available message if the message id doesn't exist.
+
+  Include or exclude the start message by using the second boolean parameter.
+  ```java
+  StartCursor.fromMessageId(MessageId, boolean)
+  ```
+
+{{< hint info >}}
+Each Pulsar message belongs to an ordered sequence on its topic.
+The sequence ID (`MessageId`) of the message is ordered in that sequence.
+`MessageId` contains some extra information (the ledger, entry, partition) on how the message is stored,
+you can create a `MessageId` by using `DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)`.
+{{< /hint >}}
+
+### Boundedness
+
+Pulsar source supports streaming and batch running modes.
+By default, the `PulsarSource` is set to run in the streaming mode.
+
+In streaming mode, Pulsar source never stops until a Flink job fails or is cancelled. However,
+you can set Pulsar source stopping at a stop position by using ```setUnboundedStopCursor(StopCursor)```.
+The Pulsar source will finish when all partitions reach their specified stop positions.
+
+You can use ```setBoundedStopCursor(StopCursor)``` to specify a stop position so that the Pulsar source can run in the batch mode.
+When all partitions have reached their stop positions, the source will finish.
+
+Built-in stop cursors include:
+
+- Connector will never stop consuming.
+  ```java
+  StopCursor.never()
+  ```
+- Stop at the latest available message in Pulsar when the connector starts consuming.
+  ```java
+  StopCursor.latest()
+  ```
+- Stop when connector meet a given message, or stop at a message which is produced after this given message.
+  ```java
+  StopCursor.atMessageId(MessageId)
+  ```
+- Stop but include the given message in consuming result.
+  ```java
+  StopCursor.afterMessageId(MessageId)
+  ```
+- Stop at the specified message time by `Message<byte[]>.getEventTime()`.
+  ```java
+  StopCursor.atEventTime(long)
+  ```
+
+### Configurable Options
+
+In addition to configuration options described above, you can set arbitrary options for `PulsarClient`,
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using `setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+
+#### PulsarClient Options
+
+Pulsar connector use the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
+which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
+
+{{< generated/pulsar_client_configuration >}}
+
+#### PulsarAdmin Options
+
+The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
+and for discovering the desired topics when Pulsar connector uses topic pattern subscription. It would share most part of the
+configuration options with the client API. The configuration options listed here are only used in the admin API.
+They are also defined in `PulsarOptions`.
+
+{{< generated/pulsar_admin_configuration >}}
+
+#### Pulsar Consumer Options
+
+In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios.
+Flink's Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's `ConsumerConfigurationData` as Flink configuration options in `PulsarSourceOptions`.
+
+{{< generated/pulsar_consumer_configuration >}}
+
+#### PulsarSource Options
+
+The configuration options below are mainly used for customizing the performance and message acknowledgement behavior.
+You can just leave them alone if you do not meet any performance issues.
+
+{{< generated/pulsar_source_configuration >}}
+
+### Dynamic Partition Discovery
+
+To handle scenarios like topic scaling-out or topic creation without restarting the Flink
+job, Pulsar source can be configured to periodically discover new partitions under provided
+topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for
+the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+
+```java
+// discover new partitions per 10 seconds
+PulsarSource.builder()
+        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
+```
+
+{{< hint warning >}}
+- Partition discovery is **enabled** by default. Pulsar connector would query the topic metadata every 30 seconds.
+- You need to set the partition discovery interval to a negative value to disable this feature.
+- The partition discovery would be disabled in batch mode even if you set this option with a non-negative value.
+{{< /hint >}}
+
+### Event Time and Watermarks
+
+By default, the message uses the timestamp embedded in Pulsar `Message<byte[]>` as the event time.
+You can define your own `WatermarkStrategy` to extract the event time from the message,
+and emit the watermark downstream:
+
+```java
+env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
+```
+
+[This documentation]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
+details about how to define a `WatermarkStrategy`.
+
+### Message Acknowledgement
+
+When a subscription is created, Pulsar [retains](https://pulsar.apache.org/docs/en/concepts-architecture-overview/#persistent-storage) all messages, even if the consumer is disconnected.
+The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully.
+Pulsar connector supports four subscription types, which makes the acknowledgement behaviors variety among different subscriptions.
+
+#### Acknowledgement on Exclusive and Failover Subscription Types
+
+`Exclusive` and `Failover` subscription types support cumulative acknowledgment. In these subscription types, Flink only needs to acknowledge
+the latest successfully consumed message. All the message before the given message are marked
+with a consumed status.
+
+Pulsar source acknowledges the current consuming message when checkpoints are **completed**,
+to ensure the consistency between Flink's checkpoint state and committed position on Pulsar brokers.
+
+If checkpointing is disabled, Pulsar source periodically acknowledges messages.
+You can set the acknowledgement period by using the `PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` option.
+
+Pulsar source does **NOT** rely on committed positions for fault tolerance.
+Acknowledging messages is only for exposing the progress of consumer and monitoring on these two subscription types.
+
+#### Acknowledgement on Shared and Key_Shared Subscription Types
+
+`Shared` and `Key_Shared` subscription types need to acknowledge messages one by one. You can acknowledge
+a message in a transaction and commit it to Pulsar.
+
+You should enable transaction in the Pulsar `borker.conf` file when using these two subscription types in connector:
+
+```text
+transactionCoordinatorEnabled=true
+```
+
+Pulsar transaction would be created with 3 hours as the timeout by default. Make sure that timeout > checkpoint interval + maximum recovery time.
+A shorter checkpoint interval would increase the consuming performance.
+You can change the transaction timeout by using the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+
+If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
+`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
+The message would be immediately acknowledged after consuming.
+We can not promise consistency in this scenario.
+
+{{< hint info >}}
+All acknowledgements in a transaction are recorded in the Pulsar broker side.
+{{< /hint >}}
+
+## Upgrading to the Latest Connector Version
+
+The generic upgrade steps are outlined in [upgrading jobs and Flink versions guide]({{< ref "docs/ops/upgrading" >}}).
+The Pulsar connector does not store any state on the Flink side. The Pulsar connector pushes and stores all the states on the Pulsar side.
+For Pulsar, you additionally need to know these limitations:
+
+* Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
+* Always use a newer Pulsar client with Pulsar connector for consuming message from Pulsar.
+
+## Troubleshooting
+
+If you have a problem with Pulsar when using Flink, keep in mind that Flink only wraps
+[PulsarClient](https://pulsar.apache.org/docs/en/client-libraries-java/) or
+[PulsarAdmin](https://pulsar.apache.org/docs/en/admin-api-overview/)
+and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers,
+reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
+
+{{< top >}}