| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #pragma once |
| |
| #include <pulsar/defines.h> |
| |
| #include "consumer.h" |
| #include "producer_configuration.h" |
| |
| #ifdef __cplusplus |
| extern "C" { |
| #endif |
| |
| typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t; |
| |
| typedef enum |
| { |
| /** |
| * There can be only 1 consumer on the same topic with the same consumerName |
| */ |
| pulsar_ConsumerExclusive, |
| |
| /** |
| * Multiple consumers will be able to use the same consumerName and the messages |
| * will be dispatched according to a round-robin rotation between the connected consumers |
| */ |
| pulsar_ConsumerShared, |
| |
| /** Only one consumer is active on the subscription; Subscription can have N consumers |
| * connected one of which will get promoted to master if the current master becomes inactive |
| */ |
| pulsar_ConsumerFailover, |
| |
| /** |
| * Multiple consumer will be able to use the same subscription and all messages with the same key |
| * will be dispatched to only one consumer |
| */ |
| pulsar_ConsumerKeyShared |
| } pulsar_consumer_type; |
| |
| typedef enum |
| { |
| /** |
| * the latest position which means the start consuming position will be the last message |
| */ |
| initial_position_latest, |
| /** |
| * the earliest position which means the start consuming position will be the first message |
| */ |
| initial_position_earliest |
| } initial_position; |
| |
| typedef enum |
| { |
| // This is the default option to fail consume until crypto succeeds |
| pulsar_ConsumerFail, |
| // Message is silently acknowledged and not delivered to the application |
| pulsar_ConsumerDiscard, |
| // Deliver the encrypted message to the application. It's the application's |
| // responsibility to decrypt the message. If message is also compressed, |
| // decompression will fail. If message contain batch messages, client will |
| // not be able to retrieve individual messages in the batch |
| pulsar_ConsumerConsume |
| } pulsar_consumer_crypto_failure_action; |
| |
| typedef enum |
| { |
| // Only subscribe to persistent topics. |
| pulsar_consumer_regex_sub_mode_PersistentOnly = 0, |
| // Only subscribe to non-persistent topics. |
| pulsar_consumer_regex_sub_mode_NonPersistentOnly = 1, |
| // Subscribe to both persistent and non-persistent topics. |
| pulsar_consumer_regex_sub_mode_AllTopics = 2 |
| } pulsar_consumer_regex_subscription_mode; |
| |
| // Though any field could be non-positive, if all of them are non-positive, this policy will be treated as |
| // invalid |
| typedef struct { |
| // Max num messages, a non-positive value means no limit. |
| int maxNumMessages; |
| // Max num bytes, a non-positive value means no limit. |
| long maxNumBytes; |
| // The receive timeout, a non-positive value means no limit. |
| long timeoutMs; |
| } pulsar_consumer_batch_receive_policy_t; |
| |
| typedef struct { |
| // Name of the dead topic where the failing messages are sent. |
| // If it's null, use sourceTopicName + "-" + subscriptionName + "-DLQ" as the value |
| const char *dead_letter_topic; |
| // Maximum number of times that a message is redelivered before being sent to the dead letter queue. |
| // If it's not greater than 0, treat it as INT_MAX, it means DLQ disable. |
| int max_redeliver_count; |
| // Name of the initial subscription name of the dead letter topic. |
| // If it's null, the initial subscription for the dead letter topic is not created. |
| // If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer |
| // fails to be created. |
| const char *initial_subscription_name; |
| } pulsar_consumer_config_dead_letter_policy_t; |
| |
| /// Callback definition for MessageListener |
| typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx); |
| |
| PULSAR_PUBLIC pulsar_consumer_configuration_t *pulsar_consumer_configuration_create(); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_free( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Specify the consumer type. The consumer type enables |
| * specifying the type of subscription. In Exclusive subscription, |
| * only a single consumer is allowed to attach to the subscription. Other consumers |
| * will get an error message. In Shared subscription, multiple consumers will be |
| * able to use the same subscription name and the messages will be dispatched in a |
| * round robin fashion. In Failover subscription, a primary-failover subscription model |
| * allows for multiple consumers to attach to a single subscription, though only one |
| * of them will be “master” at a given time. Only the primary consumer will receive |
| * messages. When the primary consumer gets disconnected, one among the failover |
| * consumers will be promoted to primary and will start getting messages. |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_consumer_type( |
| pulsar_consumer_configuration_t *consumer_configuration, pulsar_consumer_type consumerType); |
| |
| PULSAR_PUBLIC pulsar_consumer_type |
| pulsar_consumer_configuration_get_consumer_type(pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_schema_info( |
| pulsar_consumer_configuration_t *consumer_configuration, pulsar_schema_type schemaType, const char *name, |
| const char *schema, pulsar_string_map_t *properties); |
| |
| /** |
| * A message listener enables your application to configure how to process |
| * and acknowledge messages delivered. A listener will be called in order |
| * for every message received. |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_message_listener( |
| pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener, |
| void *ctx); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_has_message_listener( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Sets the size of the consumer receive queue. |
| * |
| * The consumer receive queue controls how many messages can be accumulated by the Consumer before the |
| * application calls receive(). Using a higher value could potentially increase the consumer throughput |
| * at the expense of bigger memory utilization. |
| * |
| * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling |
| * pre-fetching of |
| * messages. This approach improves the message distribution on shared subscription, by pushing messages |
| * only to |
| * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can |
| * be |
| * used if the consumer queue size is zero. The receive() function call should not be interrupted when |
| * the consumer queue size is zero. |
| * |
| * Default value is 1000 messages and should be good for most use cases. |
| * |
| * @param size |
| * the new receiver queue size value |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_receiver_queue_size( |
| pulsar_consumer_configuration_t *consumer_configuration, int size); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_get_receiver_queue_size( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set the max total receiver queue size across partitons. |
| * <p> |
| * This setting will be used to reduce the receiver queue size for individual partitions |
| * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000). |
| * |
| * @param maxTotalReceiverQueueSizeAcrossPartitions |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions( |
| pulsar_consumer_configuration_t *consumer_configuration, int maxTotalReceiverQueueSizeAcrossPartitions); |
| |
| /** |
| * @return the configured max total receiver queue size across partitions |
| */ |
| PULSAR_PUBLIC int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t *consumer_configuration, |
| const char *consumerName); |
| |
| PULSAR_PUBLIC const char *pulsar_consumer_get_consumer_name( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than |
| * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). |
| * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are |
| * redelivered. |
| * @param timeout in milliseconds |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_set_unacked_messages_timeout_ms( |
| pulsar_consumer_configuration_t *consumer_configuration, const uint64_t milliSeconds); |
| |
| /** |
| * @return the configured timeout in milliseconds for unacked messages. |
| */ |
| PULSAR_PUBLIC long pulsar_consumer_get_unacked_messages_timeout_ms( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set the delay to wait before re-delivering messages that have failed to be process. |
| * <p> |
| * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message |
| * will be redelivered after a fixed timeout. The default is 1 min. |
| * |
| * @param redeliveryDelay |
| * redelivery delay for failed messages |
| * @param timeUnit |
| * unit in which the timeout is provided. |
| * @return the consumer builder instance |
| */ |
| PULSAR_PUBLIC void pulsar_configure_set_negative_ack_redelivery_delay_ms( |
| pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis); |
| |
| /** |
| * Get the configured delay to wait before re-delivering messages that have failed to be process. |
| * |
| * @param consumer_configuration the consumer conf object |
| * @return redelivery delay for failed messages |
| */ |
| PULSAR_PUBLIC long pulsar_configure_get_negative_ack_redelivery_delay_ms( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent |
| * to broker until the time window reaches its end, or the number of grouped messages reaches |
| * limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be |
| * directly sent to broker without grouping. |
| * |
| * @param consumer_configuration the consumer conf object |
| * @param ackGroupMillis time of ACK grouping window in milliseconds. |
| */ |
| PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_time_ms( |
| pulsar_consumer_configuration_t *consumer_configuration, long ackGroupingMillis); |
| |
| /** |
| * Get grouping time window in milliseconds. |
| * |
| * @param consumer_configuration the consumer conf object |
| * @return grouping time window in milliseconds. |
| */ |
| PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_time_ms( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set max number of grouped messages within one grouping time window. If it's set to a |
| * non-positive value, number of grouped messages is not limited. Default is 1000. |
| * |
| * @param consumer_configuration the consumer conf object |
| * @param maxGroupingSize max number of grouped messages with in one grouping time window. |
| */ |
| PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_max_size( |
| pulsar_consumer_configuration_t *consumer_configuration, long maxGroupingSize); |
| |
| /** |
| * Get max number of grouped messages within one grouping time window. |
| * |
| * @param consumer_configuration the consumer conf object |
| * @return max number of grouped messages within one grouping time window. |
| */ |
| PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_max_size( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC int pulsar_consumer_is_encryption_enabled( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_default_crypto_key_reader( |
| pulsar_consumer_configuration_t *consumer_configuration, const char *public_key_path, |
| const char *private_key_path); |
| |
| PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_consumer_configuration_get_crypto_failure_action( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_crypto_failure_action( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| pulsar_consumer_crypto_failure_action cryptoFailureAction); |
| |
| PULSAR_PUBLIC int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration, |
| int compacted); |
| |
| PULSAR_PUBLIC int pulsar_consumer_get_subscription_initial_position( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_set_subscription_initial_position( |
| pulsar_consumer_configuration_t *consumer_configuration, initial_position subscriptionInitialPosition); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, |
| const char *name, const char *value); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_priority_level( |
| pulsar_consumer_configuration_t *consumer_configuration, int priority_level); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_get_priority_level( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_max_pending_chunked_message( |
| pulsar_consumer_configuration_t *consumer_configuration, int max_pending_chunked_message); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_get_max_pending_chunked_message( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| int auto_ack_oldest_chunked_message_on_queue_full); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_is_auto_ack_oldest_chunked_message_on_queue_full( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_start_message_id_inclusive( |
| pulsar_consumer_configuration_t *consumer_configuration, int start_message_id_inclusive); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_is_start_message_id_inclusive( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_batch_index_ack_enabled( |
| pulsar_consumer_configuration_t *consumer_configuration, int enabled); |
| |
| PULSAR_PUBLIC int pulsar_consumer_configuration_is_batch_index_ack_enabled( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_regex_subscription_mode( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| pulsar_consumer_regex_subscription_mode regex_sub_mode); |
| |
| PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode |
| pulsar_consumer_configuration_get_regex_subscription_mode( |
| pulsar_consumer_configuration_t *consumer_configuration); |
| |
| /** |
| * Set batch receive policy. |
| * |
| * @param [in] consumer_configuration a non-null pointer of the consumer configuration |
| * @param [in] batch_receive_policy |
| * @return 0 on success and -1 on failure |
| * |
| * The possible failed reasons are: |
| * - batch_receive_policy is null |
| * - batch_receive_policy points to an invalid policy |
| */ |
| PULSAR_PUBLIC int pulsar_consumer_configuration_set_batch_receive_policy( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| const pulsar_consumer_batch_receive_policy_t *batch_receive_policy); |
| |
| /** |
| * Get the batch receive policy. |
| * |
| * @param [in] consumer_configuration a non-null pointer of the consumer configuration |
| * @param [out] batch_receive_policy |
| * |
| * If batch_receive_policy is not null, the instance that it points to will be updated to the batch receive |
| * policy of the consumer configuration. |
| * |
| * If the policy was never set before, the batch_receive_policy will be set with the following value: |
| * {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100} |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_configuration_get_batch_receive_policy( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| pulsar_consumer_batch_receive_policy_t *batch_receive_policy); |
| |
| PULSAR_PUBLIC void pulsar_consumer_configuration_set_dlq_policy( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| const pulsar_consumer_config_dead_letter_policy_t *dlq_policy); |
| |
| /** |
| * Get the dlq policy |
| * |
| * @param [in] consumer_configuration a non-null pointer of the consumer configuration |
| * @param [out] dlq_policy If dlq_policy is not null, |
| * the instance that it points to will be updated to the dead letter policy of the consumer configuration. |
| * |
| */ |
| PULSAR_PUBLIC void pulsar_consumer_configuration_get_dlq_policy( |
| pulsar_consumer_configuration_t *consumer_configuration, |
| pulsar_consumer_config_dead_letter_policy_t *dlq_policy); |
| |
| // const CryptoKeyReaderPtr getCryptoKeyReader() |
| // |
| // const; |
| // ConsumerConfiguration& |
| // setCryptoKeyReader(CryptoKeyReaderPtr |
| // cryptoKeyReader); |
| // |
| // ConsumerCryptoFailureAction getCryptoFailureAction() |
| // |
| // const; |
| // ConsumerConfiguration& |
| // setCryptoFailureAction(ConsumerCryptoFailureAction |
| // action); |
| |
| #ifdef __cplusplus |
| } |
| #endif |