Flink 当前只提供 Apache Pulsar 数据源,用户可以使用它从 Pulsar 读取数据,并保证每条数据只被处理一次。
连接器当前支持 Pulsar 2.7.0 之后的版本,但是连接器使用到了 Pulsar 的事务机制,建议在 Pulsar 2.8.0 及其之后的版本上使用连接器进行数据读取。
如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 PIP-72。
{{< artifact flink-connector-pulsar withScalaVersion >}}
使用本连接器的同时,记得把 flink-connector-base
也加到项目的依赖里面:
{{< artifact flink-connector-base >}}
Flink 的流连接器并不会放到发行文件里面一同发布,阅读[此文档]({{< ref “docs/dev/datastream/project-configuration” >}}),了解如何将连接器添加到集群实例内。
{{< hint info >}} Pulsar 数据源基于 Flink 最新的[批流一体 API]({{< ref “docs/dev/datastream/sources.md” >}}) 进行开发。
如果要想使用基于旧版的 SourceFunction
实现的 Pulsar 数据源,或者是项目的 Flink 版本低于 1.14,可以使用 StreamNative 单独维护的 pulsar-flink。 {{< /hint >}}
Pulsar 数据源提供了 builder 类来构造数据源实例。下面的代码实例使用 builder 类创建的数据源会从 topic “persistent://public/default/my-topic” 的数据开始端进行消费。 连接器使用了 Exclusive(独占)的订阅方式消费消息,订阅名称为 my-subscription
,并把消息体的二进制字节流以 UTF-8 的方式编码为字符串。
PulsarSource<String> pulsarSource = PulsarSource.builder() .setServiceUrl(serviceUrl) .setAdminUrl(adminUrl) .setStartCursor(StartCursor.earliest()) .setTopics("my-topic") .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) .setSubscriptionName("my-subscription") .setSubscriptionType(SubscriptionType.Exclusive) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
如果使用构造类构造 Pulsar 数据源,一定要提供下面几个属性:
setServiceUrl(String)
方法提供setAdminUrl(String)
方法提供setSubscriptionName(String)
方法提供Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
PulsarSource.builder().setTopics("some-topic1", "some-topic2") // 从 topic "topic-a" 的 0 和 1 分区上消费 PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:
PulsarSource.builder().setTopicPattern("topic-*")
从 Pulsar 2.0 之后,完整的 topic 名称格式为 {persistent|non-persistent}://租户/命名空间/topic
。 但是连接器不需要提供 topic 名称的完整定义,因为 topic 类型、租户、命名空间都设置了默认值。
Topic 属性 | 默认值 |
---|---|
topic 类型 | persistent |
租户 | public |
命名空间 | default |
下面的表格提供了当前 Pulsar 支持的简写方式:
topic 名称简写 | 翻译后的 topic 名称 |
---|---|
my-topic | persistent://public/default/my-topic |
my-tenant/my-namespace/my-topic | persistent://my-tenant/my-namespace/my-topic |
{{< hint warning >}} 对于 non-persistent(非持久化) topic,连接器不支持简写名称。所以无法将 non-persistent://public/default/my-topic
简写成 non-persistent://my-topic
。 {{< /hint >}}
对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在内部按照分区的大小拆分成等量的无分区 Topic。
例如,在 Pulsar 的 sample
租户下面的 flink
命名空间里面创建了一个有 3 个分区的 topic,给它起名为 simple-string
。 可以在 Pulsar 上看到如下的 topic 列表:
Topic 名称 | 是否分区 |
---|---|
persistent://sample/flink/simple-string | 是 |
persistent://sample/flink/simple-string-partition-0 | 否 |
persistent://sample/flink/simple-string-partition-1 | 否 |
persistent://sample/flink/simple-string-partition-2 | 否 |
这意味着,用户可以用上面的子 topic 去直接消费分区里面的数据,不需要再去基于上层的父 topic 去消费全部分区的数据。 例如:使用 PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2")
将会只消费 topic sample/flink/simple-string
上面的分区 1 和 2 里面的消息。
前面提到了 Pulsar topic 有 persistent
、non-persistent
两种类型,使用正则表达式消费数据的时候,连接器会尝试从正则表达式里面解析出消息的类型。 例如:PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")
会解析出 non-persistent
这个 topic 类型。 如果用户使用 topic 名称简写的方式,连接器会使用默认的消息类型 persistent
。
如果想用正则去消费 persistent
和 non-persistent
类型的 topic,需要使用 RegexSubscriptionMode
定义 topic 类型,例如:setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)
。
反序列化器用于解析 Pulsar 消息,连接器使用 PulsarDeserializationSchema
来定义反序列化器。 用户可以在 builder 类中使用 setDeserializationSchema(PulsarDeserializationSchema)
方法配置反序列化器,它会解析 Pulsar 的 Message<byte[]>
实例。
如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 PulsarDeserializationSchema
。Pulsar 连接器里面提供了 3 种预定义好的反序列化器。
// 基础数据类型 PulsarDeserializationSchema.pulsarSchema(Schema) // 结构类型 (JSON, Protobuf, Avro, etc.) PulsarDeserializationSchema.pulsarSchema(Schema, Class) // 键值对类型 PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
DeserializationSchema
解析消息。PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
TypeInformation
解析消息。PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
Pulsar 的 Message<byte[]>
包含了很多 额外的属性。 例如,消息的 key,消息发送时间,消息生产时间,用户在消息上自定义的键值对属性等。可以使用 Message<byte[]>
接口来获取这些属性。
如果用户需要基于这些额外的属性来解析一条消息,可以实现 PulsarDeserializationSchema
接口。 并一定要确保 PulsarDeserializationSchema.getProducedType()
方法返回的 TypeInformation
是正确的结果。 Flink 使用 TypeInformation
将解析出来的结果序列化传递到下游算子。
订阅是命名好的配置规则,指导消息如何投递给消费者。连接器需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
当前 Pulsar 连接器里面,独占
和 灾备
的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未消费的数据交给其他的 reader 来消费数据。
默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(SubscriptionType.Shared
)。
// 名为 "my-shared" 的共享订阅 PulsarSource.builder().setSubscriptionName("my-shared") // 名为 "my-exclusive" 的独占订阅 PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
如果想在 Pulsar 连接器里面使用 key 共享
订阅,需要提供 RangeGenerator
实例。RangeGenerator
会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。
Pulsar 连接器也提供了一个名为 UniformRangeGenerator
的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
连接器使用 setStartCursor(StartCursor)
方法给定开始消费的位置。内置的消费位置有:
StartCursor.earliest()
StartCursor.latest()
StartCursor.fromMessageId(MessageId)
StartCursor.fromMessageId(MessageId, boolean)
StartCursor.fromMessageTime(long)
{{< hint info >}} 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。 Pulsar 称这个序列号为 MessageId
,用户可以使用 DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)
创建它。 {{< /hint >}}
Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连接器使用流的方式消费数据。除非任务失败或者被取消,否则连接器将持续消费数据。 用户可以使用 setBoundedStopCursor(StopCursor)
给定停止消费的位置,这种情况下连接器会使用批的方式进行消费。当所有 topic 分区都消费到了停止位置,Flink 任务就会结束。
使用流的方式一样可以给定停止位置,使用 setUnboundedStopCursor(StopCursor)
方法即可。
内置的停止位置如下:
StopCursor.never()
StopCursor.latest()
StopCursor.atMessageId(MessageId)
StopCursor.afterMessageId(MessageId)
StopCursor.atEventTime(long)
除了前面提到的配置选项,连接器还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 setConfig(ConfigOption<T>, T)
和 setConfig(Configuration)
方法给定下述的全部配置。
Pulsar 连接器使用Java 客户端来创建消费实例,相关的配置定义于 Pulsar 的 ClientConfigurationData
内。连接器在 PulsarOptions
选项中,定义大部分的可供用户定义的配置。
{{< generated/pulsar_client_configuration >}}
管理 API 用于查询 topic 的元数据和用正则订阅的时候的 topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,连接器也在 PulsarOptions
里予以定义。
{{< generated/pulsar_admin_configuration >}}
Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。 Flink 上的 Pulsar 连接器使用消费者 API 进行消费,它的配置定义于 Pulsar 的 ConsumerConfigurationData
内。连接器将其中大部分的可供用户定义的配置定义于 PulsarSourceOptions
内。
{{< generated/pulsar_consumer_configuration >}}
下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用强制配置。
{{< generated/pulsar_source_configuration >}}
为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。 对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS
设置一个正整数即可启用。
// 10 秒查询一次分区信息 PulsarSource.builder() .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);
{{< hint warning >}} 默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。 {{< /hint >}}
默认情况下,连接器使用 Pulsar 的 Message<byte[]>
里面的时间作为解析结果的时间戳。用户可以使用 WatermarkStrategy
来自行解析出想要的消息时间,并向下游传递对应的水位线。
env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
[这篇文档]({{< ref “docs/dev/datastream/event-time/generating_watermarks.md” >}}) 详细讲解了如何定义 WatermarkStrategy
。
一旦在 topic 上创建了订阅,消息便会存储在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当连接器同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。连接器支持四种订阅方式,它们的消息确认方式也大不相同。
独占
和 灾备
订阅下,连接器使用累进式确认方式。确认某条消息已经被处理时,其前面被消费的消息会自动被置为已读。Pulsar 连接器会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
如果用户没有在 Flink 上启用检查点,连接器可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL
来进行定义。
需要注意的是,此种场景下,Pulsar 连接器并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。
共享
和 key 共享
需要依次确认每一条消息,所以连接器在 Pulsar 事务里面进行消息确认,然后将事务提交到 Pulsar。
首先需要在 Pulsar 的 borker.conf
文件里面启用事务:
transactionCoordinatorEnabled=true
连接器创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS
来设置事务的超时时间。
如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE
选项设置为 true
,消息从 Pulsar 消费后会被立刻置为已读。连接器无法保证此种场景下的消息一致性。
连接器在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。
常见的升级步骤,请参阅[升级应用程序和 Flink 版本]({{< ref “docs/ops/upgrading” >}})。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:
使用 Flink 和 Pulsar 交互时如果遇到问题,一定要牢记 Flink 只使用了 Pulsar 的Java 客户端 和管理 API。用户遇到的问题很有可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本、或者修改 Pulsar 的配置,Pulsar 连接器的配置来尝试解决问题。
{{< top >}}