blob: 1aead8bc4e7290eaa50dfca857e900bd6c228013 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package producer
import (
type QueueSelector interface {
Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
// manualQueueSelector use the queue manually set in the provided Message's QueueID field as the queue to send.
type manualQueueSelector struct{}
func NewManualQueueSelector() QueueSelector {
return new(manualQueueSelector)
func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
return message.Queue
// randomQueueSelector choose a random queue each time.
type randomQueueSelector struct {
rander *rand.Rand
func NewRandomQueueSelector() QueueSelector {
s := new(randomQueueSelector)
s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
return s
func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
i := r.rander.Intn(len(queues))
return queues[i]
// roundRobinQueueSelector choose the queue by roundRobin.
type roundRobinQueueSelector struct {
indexer map[string]*int32
func NewRoundRobinQueueSelector() QueueSelector {
s := &roundRobinQueueSelector{
Locker: new(sync.Mutex),
indexer: map[string]*int32{},
return s
func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
t := message.Topic
if _, exist := r.indexer[t]; !exist {
if _, exist := r.indexer[t]; !exist {
var v = int32(0)
r.indexer[t] = &v
index := r.indexer[t]
i := atomic.AddInt32(index, 1)
if i < 0 {
i = -i
atomic.StoreInt32(index, 0)
qIndex := int(i) % len(queues)
return queues[qIndex]
type hashQueueSelector struct {
random QueueSelector
func NewHashQueueSelector() QueueSelector {
return &hashQueueSelector{
random: NewRandomQueueSelector(),
// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
key := message.GetShardingKey()
if len(key) == 0 {
return h.random.Select(message, queues)
hasher := fnv.New32a()
_, err := hasher.Write([]byte(key))
if err != nil {
return nil
queueId := int(hasher.Sum32()) % len(queues)
if queueId < 0 {
queueId = -queueId
return queues[queueId]