blob: f8013a16ffd316faf0d5f490a7f6aca0aa70be5c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
type HashingScheme int
const (
// JavaStringHash and Java String.hashCode() equivalent
JavaStringHash HashingScheme = iota
// Murmur3_32Hash use Murmur3 hashing function
Murmur3_32Hash
)
type CompressionType int
const (
NoCompression CompressionType = iota
LZ4
ZLib
ZSTD
)
type CompressionLevel int
const (
// Default compression level
Default CompressionLevel = iota
// Faster compression, with lower compression ratio
Faster
// Higher compression rate, but slower
Better
)
type ProducerAccessMode int
const (
// ProducerAccessModeShared is default multiple producers can publish on a topic.
ProducerAccessModeShared ProducerAccessMode = iota
// ProducerAccessModeExclusive is required exclusive access for producer.
// Fail immediately if there's already a producer connected.
ProducerAccessModeExclusive
// ProducerAccessModeWaitForExclusive is pending until producer can acquire exclusive access.
ProducerAccessModeWaitForExclusive
)
// TopicMetadata represents a topic metadata.
type TopicMetadata interface {
// NumPartitions returns the number of partitions for a particular topic.
NumPartitions() uint32
}
type ProducerOptions struct {
// Topic specifies the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
// Name specifies a name for the producer.
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.ProducerName().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
// a topic.
Name string
// Properties specifies a set of application defined properties for the producer.
// This properties will be visible in the topic stats
Properties map[string]string
// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
// Send and SendAsync returns an error after timeout.
// Default is 30 seconds, negative such as -1 to disable.
SendTimeout time.Duration
// DisableBlockIfQueueFull controls whether Send and SendAsync block if producer's message queue is full.
// Default is false, if set to true then Send and SendAsync return error when queue is full.
DisableBlockIfQueueFull bool
// MaxPendingMessages specifies the max size of the queue holding the messages pending to receive an
// acknowledgment from the broker.
MaxPendingMessages int
// HashingScheme is used to define the partition on where to publish a particular message.
// Standard hashing functions available are:
//
// - `JavaStringHash` : Java String.hashCode() equivalent
// - `Murmur3_32Hash` : Use Murmur3 hashing function.
// https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
//
// Default is `JavaStringHash`.
HashingScheme
// CompressionType specifies the compression type for the producer.
// By default, message payloads are not compressed. Supported compression types are:
// - LZ4
// - ZLIB
// - ZSTD
//
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with ZSTD.
CompressionType
// CompressionLevel defines the desired compression level. Options:
// - Default
// - Faster
// - Better
CompressionLevel
// MessageRouter represents a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
MessageRouter func(*ProducerMessage, TopicMetadata) int
// DisableBatching controls whether automatic batching of messages is enabled for the producer. By default batching
// is enabled.
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
// contents.
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
// Setting `DisableBatching: true` will make the producer to send messages individually
DisableBatching bool
// BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
// interval or until
BatchingMaxPublishDelay time.Duration
// BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
BatchingMaxMessages uint
// BatchingMaxSize specifies the maximum number of bytes permitted in a batch. (default 128 KB)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
BatchingMaxSize uint
// Interceptors is a chain of interceptors, These interceptors will be called at some points defined
// in ProducerInterceptor interface
Interceptors ProducerInterceptors
// Schema represents the schema implementation.
Schema Schema
// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
// This will be used to create batch container when batching is enabled.
// Options:
// - DefaultBatchBuilder
// - KeyBasedBatchBuilder
BatcherBuilderType
// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
// Default is 1 minute
PartitionsAutoDiscoveryInterval time.Duration
// Disable multiple Schame Version
// Default false
DisableMultiSchema bool
// Encryption specifies the fields required to encrypt a message
Encryption *ProducerEncryptionInfo
// EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking
// is disabled.
// Chunking can not be enabled when batching is enabled.
EnableChunking bool
// ChunkMaxMessageSize is the max size of single chunk payload.
// It will actually only take effect if it is smaller than the maxMessageSize from the broker.
ChunkMaxMessageSize uint
// The type of access to the topic that the producer requires. (default ProducerAccessModeShared)
// Options:
// - ProducerAccessModeShared
// - ProducerAccessModeExclusive
ProducerAccessMode
}
// Producer is used to publish messages on a topic
type Producer interface {
// Topic return the topic to which producer is publishing to
Topic() string
// Name return the producer name which could have been assigned by the system or specified by the client
Name() string
// Send a message
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
Send(context.Context, *ProducerMessage) (MessageID, error)
// SendAsync a message in asynchronous mode
// This call is blocked when the `maxPendingMessages` becomes full (default: 1000)
// The callback will report back the message being published and
// the eventual error in publishing
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
// LastSequenceID get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
// was published and acknowledged by the broker.
// After recreating a producer with the same producer name, this will return the last message that was
// published in the previous producer session, or -1 if there no message was ever published.
// return the last sequence id published by this producer.
LastSequenceID() int64
// Deprecated: Use `FlushWithCtx()` instead.
Flush() error
// Flush all the messages buffered in the client and wait until all messageshave been successfully
// persisted.
FlushWithCtx(ctx context.Context) error
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Close()
}