[ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer (#680)
* fix(golang): correct judgment of topic route equality and optimize load balancer
* fix(golang): add tests for routeEqual
* refactor
diff --git a/golang/client.go b/golang/client.go
index 45e4b54..71c4819 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@
"encoding/hex"
"errors"
"fmt"
- "reflect"
"sync"
"time"
@@ -32,6 +31,7 @@
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+ "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -504,27 +504,40 @@
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
- oldRoute := v
newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
if err != nil {
cli.log.Errorf("scheduled queryRoute err=%v", err)
}
- if newRoute == nil && oldRoute != nil {
+ if newRoute == nil && v != nil {
cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
return true
}
- if !reflect.DeepEqual(newRoute, oldRoute) {
+ var oldRoute []*v2.MessageQueue
+ if v != nil {
+ oldRoute = v.([]*v2.MessageQueue)
+ }
+ if !routeEqual(oldRoute, newRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {
case *defaultProducer:
- plb, err := NewPublishingLoadBalancer(newRoute)
- if err == nil {
- impl.publishingRouteDataResultCache.Store(topic, plb)
+ existing, ok := impl.publishingRouteDataResultCache.Load(topic)
+ if !ok {
+ plb, err := NewPublishingLoadBalancer(newRoute)
+ if err == nil {
+ impl.publishingRouteDataResultCache.Store(topic, plb)
+ }
+ } else {
+ impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
}
case *defaultSimpleConsumer:
- slb, err := NewSubscriptionLoadBalancer(newRoute)
- if err == nil {
- impl.subTopicRouteDataResultCache.Store(topic, slb)
+ existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
+ if !ok {
+ slb, err := NewSubscriptionLoadBalancer(newRoute)
+ if err == nil {
+ impl.subTopicRouteDataResultCache.Store(topic, slb)
+ }
+ } else {
+ impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
}
}
}
@@ -534,6 +547,19 @@
ticker.Tick(f, time.Second*30, cli.done)
return nil
}
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+ if len(old) != len(new) {
+ return false
+ }
+ for i := 0; i < len(old); i++ {
+ if !proto.Equal(old[i], new[i]) {
+ return false
+ }
+ }
+ return true
+}
+
func (cli *defaultClient) notifyClientTermination() {
cli.log.Info("start notifyClientTermination")
ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d7..4549bdf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+ "reflect"
"testing"
"time"
@@ -293,3 +294,48 @@
assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
}
+
+func Test_routeEqual(t *testing.T) {
+ oldMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+ newMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+
+ newMq.ProtoReflect() // message internal field value will be changed
+
+ oldRoute := []*v2.MessageQueue{oldMq}
+ newRoute := []*v2.MessageQueue{newMq}
+
+ assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(nil, nil))
+ assert.Equal(t, false, routeEqual(nil, newRoute))
+ assert.Equal(t, false, routeEqual(oldRoute, nil))
+ assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd4..da2dd64 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@
type PublishingLoadBalancer interface {
TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
+ CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}
type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@
return candidates, nil
}
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer {
+ return &publishingLoadBalancer{
+ messageQueues: messageQueues,
+ index: plb.index,
+ }
+}
+
type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
+ CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}
type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@
selectMessageQueue := slb.messageQueues[idx]
return selectMessageQueue, nil
}
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer {
+ return &subscriptionLoadBalancer{
+ messageQueues: messageQueues,
+ index: slb.index,
+ }
+}