blob: 06707f945a5124da8080b54947d8a69b0e330427 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <pulsar/defines.h>
#include "consumer.h"
#include "producer_configuration.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t;
typedef enum
{
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
pulsar_ConsumerExclusive,
/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
pulsar_ConsumerShared,
/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
pulsar_ConsumerFailover,
/**
* Multiple consumer will be able to use the same subscription and all messages with the same key
* will be dispatched to only one consumer
*/
pulsar_ConsumerKeyShared
} pulsar_consumer_type;
typedef enum
{
/**
* the latest position which means the start consuming position will be the last message
*/
initial_position_latest,
/**
* the earliest position which means the start consuming position will be the first message
*/
initial_position_earliest
} initial_position;
typedef enum
{
// This is the default option to fail consume until crypto succeeds
pulsar_ConsumerFail,
// Message is silently acknowledged and not delivered to the application
pulsar_ConsumerDiscard,
// Deliver the encrypted message to the application. It's the application's
// responsibility to decrypt the message. If message is also compressed,
// decompression will fail. If message contain batch messages, client will
// not be able to retrieve individual messages in the batch
pulsar_ConsumerConsume
} pulsar_consumer_crypto_failure_action;
typedef enum
{
// Only subscribe to persistent topics.
pulsar_consumer_regex_sub_mode_PersistentOnly = 0,
// Only subscribe to non-persistent topics.
pulsar_consumer_regex_sub_mode_NonPersistentOnly = 1,
// Subscribe to both persistent and non-persistent topics.
pulsar_consumer_regex_sub_mode_AllTopics = 2
} pulsar_consumer_regex_subscription_mode;
// Though any field could be non-positive, if all of them are non-positive, this policy will be treated as
// invalid
typedef struct {
// Max num messages, a non-positive value means no limit.
int maxNumMessages;
// Max num bytes, a non-positive value means no limit.
long maxNumBytes;
// The receive timeout, a non-positive value means no limit.
long timeoutMs;
} pulsar_consumer_batch_receive_policy_t;
typedef struct {
// Name of the dead topic where the failing messages are sent.
// If it's null, use sourceTopicName + "-" + subscriptionName + "-DLQ" as the value
const char *dead_letter_topic;
// Maximum number of times that a message is redelivered before being sent to the dead letter queue.
// If it's not greater than 0, treat it as INT_MAX, it means DLQ disable.
int max_redeliver_count;
// Name of the initial subscription name of the dead letter topic.
// If it's null, the initial subscription for the dead letter topic is not created.
// If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
// fails to be created.
const char *initial_subscription_name;
} pulsar_consumer_config_dead_letter_policy_t;
/// Callback definition for MessageListener
typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);
PULSAR_PUBLIC pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
PULSAR_PUBLIC void pulsar_consumer_configuration_free(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Specify the consumer type. The consumer type enables
* specifying the type of subscription. In Exclusive subscription,
* only a single consumer is allowed to attach to the subscription. Other consumers
* will get an error message. In Shared subscription, multiple consumers will be
* able to use the same subscription name and the messages will be dispatched in a
* round robin fashion. In Failover subscription, a primary-failover subscription model
* allows for multiple consumers to attach to a single subscription, though only one
* of them will be “master” at a given time. Only the primary consumer will receive
* messages. When the primary consumer gets disconnected, one among the failover
* consumers will be promoted to primary and will start getting messages.
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_set_consumer_type(
pulsar_consumer_configuration_t *consumer_configuration, pulsar_consumer_type consumerType);
PULSAR_PUBLIC pulsar_consumer_type
pulsar_consumer_configuration_get_consumer_type(pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_schema_info(
pulsar_consumer_configuration_t *consumer_configuration, pulsar_schema_type schemaType, const char *name,
const char *schema, pulsar_string_map_t *properties);
/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
* for every message received.
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_set_message_listener(
pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
void *ctx);
PULSAR_PUBLIC int pulsar_consumer_configuration_has_message_listener(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Sets the size of the consumer receive queue.
*
* The consumer receive queue controls how many messages can be accumulated by the Consumer before the
* application calls receive(). Using a higher value could potentially increase the consumer throughput
* at the expense of bigger memory utilization.
*
* Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling
* pre-fetching of
* messages. This approach improves the message distribution on shared subscription, by pushing messages
* only to
* the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can
* be
* used if the consumer queue size is zero. The receive() function call should not be interrupted when
* the consumer queue size is zero.
*
* Default value is 1000 messages and should be good for most use cases.
*
* @param size
* the new receiver queue size value
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_set_receiver_queue_size(
pulsar_consumer_configuration_t *consumer_configuration, int size);
PULSAR_PUBLIC int pulsar_consumer_configuration_get_receiver_queue_size(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set the max total receiver queue size across partitons.
* <p>
* This setting will be used to reduce the receiver queue size for individual partitions
* {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
*
* @param maxTotalReceiverQueueSizeAcrossPartitions
*/
PULSAR_PUBLIC void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
pulsar_consumer_configuration_t *consumer_configuration, int maxTotalReceiverQueueSizeAcrossPartitions);
/**
* @return the configured max total receiver queue size across partitions
*/
PULSAR_PUBLIC int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t *consumer_configuration,
const char *consumerName);
PULSAR_PUBLIC const char *pulsar_consumer_get_consumer_name(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
* 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
* If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
* redelivered.
* @param timeout in milliseconds
*/
PULSAR_PUBLIC void pulsar_consumer_set_unacked_messages_timeout_ms(
pulsar_consumer_configuration_t *consumer_configuration, const uint64_t milliSeconds);
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
PULSAR_PUBLIC long pulsar_consumer_get_unacked_messages_timeout_ms(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
* When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
* will be redelivered after a fixed timeout. The default is 1 min.
*
* @param redeliveryDelay
* redelivery delay for failed messages
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
*/
PULSAR_PUBLIC void pulsar_configure_set_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis);
/**
* Get the configured delay to wait before re-delivering messages that have failed to be process.
*
* @param consumer_configuration the consumer conf object
* @return redelivery delay for failed messages
*/
PULSAR_PUBLIC long pulsar_configure_get_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
* to broker until the time window reaches its end, or the number of grouped messages reaches
* limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
* directly sent to broker without grouping.
*
* @param consumer_configuration the consumer conf object
* @param ackGroupMillis time of ACK grouping window in milliseconds.
*/
PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_time_ms(
pulsar_consumer_configuration_t *consumer_configuration, long ackGroupingMillis);
/**
* Get grouping time window in milliseconds.
*
* @param consumer_configuration the consumer conf object
* @return grouping time window in milliseconds.
*/
PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_time_ms(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set max number of grouped messages within one grouping time window. If it's set to a
* non-positive value, number of grouped messages is not limited. Default is 1000.
*
* @param consumer_configuration the consumer conf object
* @param maxGroupingSize max number of grouped messages with in one grouping time window.
*/
PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_max_size(
pulsar_consumer_configuration_t *consumer_configuration, long maxGroupingSize);
/**
* Get max number of grouped messages within one grouping time window.
*
* @param consumer_configuration the consumer conf object
* @return max number of grouped messages within one grouping time window.
*/
PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_max_size(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC int pulsar_consumer_is_encryption_enabled(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_default_crypto_key_reader(
pulsar_consumer_configuration_t *consumer_configuration, const char *public_key_path,
const char *private_key_path);
PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_consumer_configuration_get_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_crypto_failure_action(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_crypto_failure_action cryptoFailureAction);
PULSAR_PUBLIC int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration,
int compacted);
PULSAR_PUBLIC int pulsar_consumer_get_subscription_initial_position(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_set_subscription_initial_position(
pulsar_consumer_configuration_t *consumer_configuration, initial_position subscriptionInitialPosition);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf,
const char *name, const char *value);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_priority_level(
pulsar_consumer_configuration_t *consumer_configuration, int priority_level);
PULSAR_PUBLIC int pulsar_consumer_configuration_get_priority_level(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_max_pending_chunked_message(
pulsar_consumer_configuration_t *consumer_configuration, int max_pending_chunked_message);
PULSAR_PUBLIC int pulsar_consumer_configuration_get_max_pending_chunked_message(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
pulsar_consumer_configuration_t *consumer_configuration,
int auto_ack_oldest_chunked_message_on_queue_full);
PULSAR_PUBLIC int pulsar_consumer_configuration_is_auto_ack_oldest_chunked_message_on_queue_full(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_start_message_id_inclusive(
pulsar_consumer_configuration_t *consumer_configuration, int start_message_id_inclusive);
PULSAR_PUBLIC int pulsar_consumer_configuration_is_start_message_id_inclusive(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_batch_index_ack_enabled(
pulsar_consumer_configuration_t *consumer_configuration, int enabled);
PULSAR_PUBLIC int pulsar_consumer_configuration_is_batch_index_ack_enabled(
pulsar_consumer_configuration_t *consumer_configuration);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_regex_subscription_mode(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_regex_subscription_mode regex_sub_mode);
PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode
pulsar_consumer_configuration_get_regex_subscription_mode(
pulsar_consumer_configuration_t *consumer_configuration);
/**
* Set batch receive policy.
*
* @param [in] consumer_configuration a non-null pointer of the consumer configuration
* @param [in] batch_receive_policy
* @return 0 on success and -1 on failure
*
* The possible failed reasons are:
* - batch_receive_policy is null
* - batch_receive_policy points to an invalid policy
*/
PULSAR_PUBLIC int pulsar_consumer_configuration_set_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration,
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy);
/**
* Get the batch receive policy.
*
* @param [in] consumer_configuration a non-null pointer of the consumer configuration
* @param [out] batch_receive_policy
*
* If batch_receive_policy is not null, the instance that it points to will be updated to the batch receive
* policy of the consumer configuration.
*
* If the policy was never set before, the batch_receive_policy will be set with the following value:
* {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_get_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_batch_receive_policy_t *batch_receive_policy);
PULSAR_PUBLIC void pulsar_consumer_configuration_set_dlq_policy(
pulsar_consumer_configuration_t *consumer_configuration,
const pulsar_consumer_config_dead_letter_policy_t *dlq_policy);
/**
* Get the dlq policy
*
* @param [in] consumer_configuration a non-null pointer of the consumer configuration
* @param [out] dlq_policy If dlq_policy is not null,
* the instance that it points to will be updated to the dead letter policy of the consumer configuration.
*
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_get_dlq_policy(
pulsar_consumer_configuration_t *consumer_configuration,
pulsar_consumer_config_dead_letter_policy_t *dlq_policy);
// const CryptoKeyReaderPtr getCryptoKeyReader()
//
// const;
// ConsumerConfiguration&
// setCryptoKeyReader(CryptoKeyReaderPtr
// cryptoKeyReader);
//
// ConsumerCryptoFailureAction getCryptoFailureAction()
//
// const;
// ConsumerConfiguration&
// setCryptoFailureAction(ConsumerCryptoFailureAction
// action);
#ifdef __cplusplus
}
#endif