blob: 7ac480a39da1425eb0dbecc8d297f6bb07b5bf93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.pulsar.source.config;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.setOptionValue;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACK_TIMEOUT_MILLIS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_PROPERTIES;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_DEAD_LETTER_TOPIC;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_PENDING_CHUNKED_MESSAGE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_REDELIVER_COUNT;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_POOL_MESSAGES;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PRIORITY_LEVEL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_COMPACTED;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RECEIVER_QUEUE_SIZE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TICK_DURATION_MILLIS;
/** Create source related {@link Consumer} and validate config. */
@Internal
public final class PulsarSourceConfigUtils {
private PulsarSourceConfigUtils() {
// No need to create instance.
}
private static final List<Set<ConfigOption<?>>> CONFLICT_SOURCE_OPTIONS =
ImmutableList.<Set<ConfigOption<?>>>builder()
.add(ImmutableSet.of(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP))
.build();
private static final Set<ConfigOption<?>> REQUIRED_SOURCE_OPTIONS =
ImmutableSet.<ConfigOption<?>>builder()
.add(PULSAR_SERVICE_URL)
.add(PULSAR_ADMIN_URL)
.add(PULSAR_SUBSCRIPTION_NAME)
.build();
/**
* Helper method for checking client related config options. We would validate:
*
* <ul>
* <li>If user have provided the required client config options.
* <li>If user have provided some conflict options.
* </ul>
*/
public static void checkConfigurations(Configuration configuration) {
REQUIRED_SOURCE_OPTIONS.forEach(
option ->
Preconditions.checkArgument(
configuration.contains(option),
"Config option %s is not provided for pulsar source.",
option));
CONFLICT_SOURCE_OPTIONS.forEach(
options -> {
long nums = options.stream().filter(configuration::contains).count();
Preconditions.checkArgument(
nums <= 1,
"Conflict config options %s were provided, we only support one of them for creating pulsar source.",
options);
});
}
/** Create a pulsar consumer builder by using the given Configuration. */
public static <T> ConsumerBuilder<T> createConsumerBuilder(
PulsarClient client, Schema<T> schema, Configuration configuration) {
ConsumerBuilder<T> builder = client.newConsumer(schema);
setOptionValue(configuration, PULSAR_SUBSCRIPTION_NAME, builder::subscriptionName);
setOptionValue(
configuration, PULSAR_ACK_TIMEOUT_MILLIS, v -> builder.ackTimeout(v, MILLISECONDS));
setOptionValue(configuration, PULSAR_ACK_RECEIPT_ENABLED, builder::isAckReceiptEnabled);
setOptionValue(
configuration,
PULSAR_TICK_DURATION_MILLIS,
v -> builder.ackTimeoutTickTime(v, MILLISECONDS));
setOptionValue(
configuration,
PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS,
v -> builder.negativeAckRedeliveryDelay(v, MICROSECONDS));
setOptionValue(configuration, PULSAR_SUBSCRIPTION_TYPE, builder::subscriptionType);
setOptionValue(configuration, PULSAR_SUBSCRIPTION_MODE, builder::subscriptionMode);
setOptionValue(configuration, PULSAR_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
setOptionValue(configuration, PULSAR_RECEIVER_QUEUE_SIZE, builder::receiverQueueSize);
setOptionValue(
configuration,
PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS,
v -> builder.acknowledgmentGroupTime(v, MICROSECONDS));
setOptionValue(
configuration,
PULSAR_REPLICATE_SUBSCRIPTION_STATE,
builder::replicateSubscriptionState);
setOptionValue(
configuration,
PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
builder::maxTotalReceiverQueueSizeAcrossPartitions);
setOptionValue(configuration, PULSAR_CONSUMER_NAME, builder::consumerName);
setOptionValue(configuration, PULSAR_READ_COMPACTED, builder::readCompacted);
setOptionValue(configuration, PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
setOptionValue(configuration, PULSAR_CONSUMER_PROPERTIES, builder::properties);
setOptionValue(
configuration,
PULSAR_SUBSCRIPTION_INITIAL_POSITION,
builder::subscriptionInitialPosition);
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
setOptionValue(
configuration,
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
v -> builder.autoUpdatePartitionsInterval(v, SECONDS));
setOptionValue(configuration, PULSAR_RETRY_ENABLE, builder::enableRetry);
setOptionValue(
configuration,
PULSAR_MAX_PENDING_CHUNKED_MESSAGE,
builder::maxPendingChunkedMessage);
setOptionValue(
configuration,
PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL,
builder::autoAckOldestChunkedMessageOnQueueFull);
setOptionValue(
configuration,
PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS,
v -> builder.expireTimeOfIncompleteChunkedMessage(v, MILLISECONDS));
setOptionValue(configuration, PULSAR_POOL_MESSAGES, builder::poolMessages);
return builder;
}
private static Optional<DeadLetterPolicy> createDeadLetterPolicy(Configuration configuration) {
if (configuration.contains(PULSAR_MAX_REDELIVER_COUNT)
|| configuration.contains(PULSAR_RETRY_LETTER_TOPIC)
|| configuration.contains(PULSAR_DEAD_LETTER_TOPIC)) {
DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
setOptionValue(configuration, PULSAR_MAX_REDELIVER_COUNT, builder::maxRedeliverCount);
setOptionValue(configuration, PULSAR_RETRY_LETTER_TOPIC, builder::retryLetterTopic);
setOptionValue(configuration, PULSAR_DEAD_LETTER_TOPIC, builder::deadLetterTopic);
return Optional.of(builder.build());
} else {
return Optional.empty();
}
}
}