[ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer (#680)

* fix(golang): correct judgment of topic route equality and optimize load balancer

* fix(golang): add tests for routeEqual

* refactor
diff --git a/golang/client.go b/golang/client.go
index 45e4b54..71c4819 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"reflect"
 	"sync"
 	"time"
 
@@ -32,6 +31,7 @@
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
 	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
 	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"go.uber.org/atomic"
 	"go.uber.org/zap"
@@ -504,27 +504,40 @@
 	f := func() {
 		cli.router.Range(func(k, v interface{}) bool {
 			topic := k.(string)
-			oldRoute := v
 			newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout)
 			if err != nil {
 				cli.log.Errorf("scheduled queryRoute err=%v", err)
 			}
-			if newRoute == nil && oldRoute != nil {
+			if newRoute == nil && v != nil {
 				cli.log.Info("newRoute is nil, but oldRoute is not. do not update")
 				return true
 			}
-			if !reflect.DeepEqual(newRoute, oldRoute) {
+			var oldRoute []*v2.MessageQueue
+			if v != nil {
+				oldRoute = v.([]*v2.MessageQueue)
+			}
+			if !routeEqual(oldRoute, newRoute) {
 				cli.router.Store(k, newRoute)
 				switch impl := cli.clientImpl.(type) {
 				case *defaultProducer:
-					plb, err := NewPublishingLoadBalancer(newRoute)
-					if err == nil {
-						impl.publishingRouteDataResultCache.Store(topic, plb)
+					existing, ok := impl.publishingRouteDataResultCache.Load(topic)
+					if !ok {
+						plb, err := NewPublishingLoadBalancer(newRoute)
+						if err == nil {
+							impl.publishingRouteDataResultCache.Store(topic, plb)
+						}
+					} else {
+						impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
 					}
 				case *defaultSimpleConsumer:
-					slb, err := NewSubscriptionLoadBalancer(newRoute)
-					if err == nil {
-						impl.subTopicRouteDataResultCache.Store(topic, slb)
+					existing, ok := impl.subTopicRouteDataResultCache.Load(topic)
+					if !ok {
+						slb, err := NewSubscriptionLoadBalancer(newRoute)
+						if err == nil {
+							impl.subTopicRouteDataResultCache.Store(topic, slb)
+						}
+					} else {
+						impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
 					}
 				}
 			}
@@ -534,6 +547,19 @@
 	ticker.Tick(f, time.Second*30, cli.done)
 	return nil
 }
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+	if len(old) != len(new) {
+		return false
+	}
+	for i := 0; i < len(old); i++ {
+		if !proto.Equal(old[i], new[i]) {
+			return false
+		}
+	}
+	return true
+}
+
 func (cli *defaultClient) notifyClientTermination() {
 	cli.log.Info("start notifyClientTermination")
 	ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d7..4549bdf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@
 import (
 	"context"
 	"fmt"
+	"reflect"
 	"testing"
 	"time"
 
@@ -293,3 +294,48 @@
 	assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message)
 	assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message)
 }
+
+func Test_routeEqual(t *testing.T) {
+	oldMq := &v2.MessageQueue{
+		Topic: &v2.Resource{
+			Name: "topic-test",
+		},
+		Id:         0,
+		Permission: v2.Permission_READ_WRITE,
+		Broker: &v2.Broker{
+			Name:      "broker-test",
+			Id:        0,
+			Endpoints: fakeEndpoints(),
+		},
+		AcceptMessageTypes: []v2.MessageType{
+			v2.MessageType_NORMAL,
+		},
+	}
+	newMq := &v2.MessageQueue{
+		Topic: &v2.Resource{
+			Name: "topic-test",
+		},
+		Id:         0,
+		Permission: v2.Permission_READ_WRITE,
+		Broker: &v2.Broker{
+			Name:      "broker-test",
+			Id:        0,
+			Endpoints: fakeEndpoints(),
+		},
+		AcceptMessageTypes: []v2.MessageType{
+			v2.MessageType_NORMAL,
+		},
+	}
+
+	newMq.ProtoReflect() // message internal field value will be changed
+
+	oldRoute := []*v2.MessageQueue{oldMq}
+	newRoute := []*v2.MessageQueue{newMq}
+
+	assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+	assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+	assert.Equal(t, true, routeEqual(nil, nil))
+	assert.Equal(t, false, routeEqual(nil, newRoute))
+	assert.Equal(t, false, routeEqual(oldRoute, nil))
+	assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd4..da2dd64 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@
 type PublishingLoadBalancer interface {
 	TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
 	TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error)
+	CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
 }
 
 type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@
 	return candidates, nil
 }
 
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer {
+	return &publishingLoadBalancer{
+		messageQueues: messageQueues,
+		index:         plb.index,
+	}
+}
+
 type SubscriptionLoadBalancer interface {
 	TakeMessageQueue() (*v2.MessageQueue, error)
+	CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
 }
 
 type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@
 	selectMessageQueue := slb.messageQueues[idx]
 	return selectMessageQueue, nil
 }
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer {
+	return &subscriptionLoadBalancer{
+		messageQueues: messageQueues,
+		index:         slb.index,
+	}
+}