blob: 67690be021d6776a9470af0c2ffa20e020340f1b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.flink.streaming.connectors.pulsar;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
* A class for building a pulsar source.
public class PulsarSourceBuilder<T> {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
private static final int DEFAULT_MESSAGE_RECEIVE_TIMEOUT_MS = 100;
private static final String SUBSCRIPTION_NAME = "flink-sub";
final DeserializationSchema<T> deserializationSchema;
ClientConfigurationData clientConfigurationData;
ConsumerConfigurationData<byte[]> consumerConfigurationData;
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
clientConfigurationData = new ClientConfigurationData();
consumerConfigurationData = new ConsumerConfigurationData<>();
consumerConfigurationData.setTopicNames(new TreeSet<>());
* Sets the pulsar service url to connect to. Defaults to pulsar://localhost:6650.
* @param serviceUrl service url to connect to
* @return this builder
public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank");
return this;
* Sets topics to consumer from. This is required.
* <p>Topic names (
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
* @param topics the topic to consumer from
* @return this builder
public PulsarSourceBuilder<T> topic(String... topics) {
Preconditions.checkArgument(topics != null && topics.length > 0,
"topics cannot be blank");
for (String topic : topics) {
Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
return this;
* Sets topics to consumer from. This is required.
* <p>Topic names (
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
* @param topics the topic to consumer from
* @return this builder
public PulsarSourceBuilder<T> topics(List<String> topics) {
Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
topics.forEach(topicName ->
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
return this;
* Use topic pattern to config sets of topics to consumer.
* <p>Topic names (
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
* @param topicsPattern topic pattern to consumer from
* @return this builder
public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
"Pattern has already been set.");
return this;
* Use topic pattern to config sets of topics to consumer.
* <p>Topic names (
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
* @param topicsPattern topic pattern string to consumer from
* @return this builder
public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
"Pattern has already been set.");
return this;
* Sets the subscription name for the topic consumer. Defaults to flink-sub.
* @param subscriptionName the subscription name for the topic consumer
* @return this builder
public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
"subscriptionName cannot be blank");
return this;
* Sets the subscription initial position for the topic consumer.
* Default is {@link SubscriptionInitialPosition#Latest}
* @param initialPosition the subscription initial position.
* @return this builder
public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
Preconditions.checkNotNull(initialPosition, "subscription initial position cannot be null");
return this;
* Sets the number of messages to receive before acknowledging. This defaults to 100. This
* value is only used when checkpointing is disabled.
* @param size number of messages to receive before acknowledging
* @return this builder
public PulsarSourceBuilder<T> acknowledgementBatchSize(long size) {
if (size > 0 && size <= MAX_ACKNOWLEDGEMENT_BATCH_SIZE) {
acknowledgementBatchSize = size;
return this;
throw new IllegalArgumentException(
"acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
* parameterize messageReceiveTimeoutMs for `PulsarConsumerSource`.
* @param timeout timeout in ms, should be gt 0
* @return this builder
public PulsarSourceBuilder<T> messageReceiveTimeoutMs(int timeout) {
if (timeout <= 0) {
throw new IllegalArgumentException("messageReceiveTimeoutMs can only take values > 0");
this.messageReceiveTimeoutMs = timeout;
return this;
* Set the authentication provider to use in the Pulsar client instance.
* @param authentication an instance of the {@link Authentication} provider already constructed
* @return this builder
public PulsarSourceBuilder<T> authentication(Authentication authentication) {
Preconditions.checkArgument(authentication != null,
"authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
return this;
* Configure the authentication provider to use in the Pulsar client instance.
* @param authPluginClassName
* name of the Authentication-Plugin to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
* @return this builder
* @throws PulsarClientException.UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
throws PulsarClientException.UnsupportedAuthenticationException {
"Authentication-Plugin class name can not be blank");
"Authentication-Plugin parameters can not be blank");
.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
return this;
* Configure the authentication provider to use in the Pulsar client instance
* using a config map.
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
* @return this builder
* @throws PulsarClientException.UnsupportedAuthenticationException
* failed to instantiate specified Authentication-Plugin
public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
throws PulsarClientException.UnsupportedAuthenticationException {
"Authentication-Plugin class name can not be blank");
Preconditions.checkArgument((authParams != null && !authParams.isEmpty()),
"parameters to authentication plugin can not be null/empty");
this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
return this;
* @param clientConfigurationData All client conf wrapped in a POJO
* @return this builder
public PulsarSourceBuilder<T> pulsarAllClientConf(ClientConfigurationData clientConfigurationData) {
Preconditions.checkNotNull(clientConfigurationData, "client conf should not be null");
this.clientConfigurationData = clientConfigurationData;
return this;
* @param consumerConfigurationData All consumer conf wrapped in a POJO
* @return this builder
public PulsarSourceBuilder<T> pulsarAllConsumerConf(ConsumerConfigurationData consumerConfigurationData) {
Preconditions.checkNotNull(consumerConfigurationData, "consumer conf should not be null");
this.consumerConfigurationData = consumerConfigurationData;
return this;
public SourceFunction<T> build() throws PulsarClientException{
"a service url is required");
Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null
&& !this.consumerConfigurationData.getTopicNames().isEmpty())
|| this.consumerConfigurationData.getTopicsPattern() != null,
"At least one topic or topics pattern is required");
"a subscription name is required");
return new PulsarConsumerSource<>(this);
private void setTransientFields() throws PulsarClientException {
private void setAuth() throws PulsarClientException{
if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
|| StringUtils.isBlank(this.clientConfigurationData.getAuthParams())) {
* Creates a PulsarSourceBuilder.
* @param deserializationSchema the deserializer used to convert between Pulsar's byte messages and Flink's objects.
* @return a builder
public static <T> PulsarSourceBuilder<T> builder(DeserializationSchema<T> deserializationSchema) {
Preconditions.checkNotNull(deserializationSchema, "deserializationSchema cannot be null");
return new PulsarSourceBuilder<>(deserializationSchema);