Make PartitionsAutoDiscoveryInterval configurable (#514)
Co-authored-by: Chen Liu <cliu@splunk.com>
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f512b00..1811722 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1430,7 +1430,6 @@
makeHTTPCall(t, http.MethodPut, testURL, "3")
// create producer
- partitionsAutoDiscoveryInterval = 100 * time.Millisecond
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int {
@@ -1439,6 +1438,7 @@
assert.NoError(t, err)
return i
},
+ PartitionsAutoDiscoveryInterval: 100 * time.Millisecond,
})
assert.Nil(t, err)
defer producer.Close()
diff --git a/pulsar/producer.go b/pulsar/producer.go
index b41415a..ffbdebb 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -159,6 +159,10 @@
// - DefaultBatchBuilder
// - KeyBasedBatchBuilder
BatcherBuilderType
+
+ // PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
+ // Default is 1 minute
+ PartitionsAutoDiscoveryInterval time.Duration
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index e8d43e0..1ffd24c 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -40,6 +40,9 @@
// defaultMaxMessagesPerBatch init default num of entries in per batch.
defaultMaxMessagesPerBatch = 1000
+
+ // defaultPartitionsAutoDiscoveryInterval init default time interval for partitions auto discovery
+ defaultPartitionsAutoDiscoveryInterval = 1 * time.Minute
)
type producer struct {
@@ -57,8 +60,6 @@
metrics *internal.TopicMetrics
}
-var partitionsAutoDiscoveryInterval = 1 * time.Minute
-
func getHashingFunction(s HashingScheme) func(string) uint32 {
switch s {
case JavaStringHash:
@@ -87,6 +88,9 @@
if options.BatchingMaxPublishDelay <= 0 {
options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
+ if options.PartitionsAutoDiscoveryInterval <= 0 {
+ options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval
+ }
p := &producer{
options: options,
@@ -125,7 +129,7 @@
return nil, err
}
- p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval)
+ p.stopDiscovery = p.runBackgroundPartitionDiscovery(options.PartitionsAutoDiscoveryInterval)
p.metrics.ProducersOpened.Inc()
return p, nil