[ISSUE #371]feat: clean extra code for internal packagec (#372)

* feat: clean extra code for internal packagec

Closes #371

* fix
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1add8dd..0c7f224 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -212,6 +212,7 @@
 		pc.lockTicker.Stop()
 		close(pc.done)
 
+		pc.client.UnregisterConsumer(pc.consumerGroup)
 		err = pc.defaultConsumer.shutdown()
 	})
 
diff --git a/internal/client.go b/internal/client.go
index acc25ea..ca8cc88 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -131,6 +131,7 @@
 	ClientID() string
 
 	RegisterProducer(group string, producer InnerProducer)
+	UnregisterProducer(group string)
 	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
 	InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
@@ -146,7 +147,6 @@
 	RegisterConsumer(group string, consumer InnerConsumer) error
 	UnregisterConsumer(group string)
 	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
-	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
 	RebalanceImmediately()
 	UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
@@ -600,11 +600,6 @@
 	}
 }
 
-// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
-	return nil
-}
-
 func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
 	_, exist := c.consumerMap.Load(group)
 	if exist {
@@ -618,6 +613,7 @@
 }
 
 func (c *rmqClient) UnregisterConsumer(group string) {
+	c.consumerMap.Delete(group)
 }
 
 func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
@@ -625,14 +621,7 @@
 }
 
 func (c *rmqClient) UnregisterProducer(group string) {
-}
-
-func (c *rmqClient) SelectProducer(group string) InnerProducer {
-	return nil
-}
-
-func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
-	return nil
+	c.producerMap.Delete(group)
 }
 
 func (c *rmqClient) RebalanceImmediately() {
diff --git a/internal/mock_client.go b/internal/mock_client.go
index d01a51f..d8ed7b2 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -14,6 +14,7 @@
 See the License for the specific language governing permissions and
 limitations under the License.
 */
+
 // Code generated by MockGen. DO NOT EDIT.
 // Source: client.go
 
@@ -268,6 +269,16 @@
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), group, producer)
 }
 
+// UnregisterProducer mocks base method
+func (m *MockRMQClient) UnregisterProducer(group string) {
+	m.ctrl.Call(m, "UnregisterProducer", group)
+}
+
+// UnregisterProducer indicates an expected call of UnregisterProducer
+func (mr *MockRMQClientMockRecorder) UnregisterProducer(group interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterProducer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterProducer), group)
+}
+
 // InvokeSync mocks base method
 func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
 	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
@@ -387,18 +398,6 @@
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
 }
 
-// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error {
-	ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
-	ret0, _ := ret[0].(error)
-	return ret0
-}
-
-// PullMessageAsync indicates an expected call of PullMessageAsync
-func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), ctx, brokerAddrs, request, f)
-}
-
 // RebalanceImmediately mocks base method
 func (m *MockRMQClient) RebalanceImmediately() {
 	m.ctrl.Call(m, "RebalanceImmediately")
diff --git a/internal/route.go b/internal/route.go
index f4d4116..c5e771e 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -113,7 +113,6 @@
 }
 
 func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
-	// Todo process lock timeout
 	s.lockNamesrv.Lock()
 	defer s.lockNamesrv.Unlock()
 
@@ -258,16 +257,6 @@
 	return mqs, nil
 }
 
-func (s *namesrvs) FindMQByTopic(topic string) *primitive.MessageQueue {
-	mqs, err := s.FetchPublishMessageQueues(topic)
-	if err != nil {
-		return nil
-	}
-	r := rand.New(rand.NewSource(time.Now().UnixNano()))
-	i := utils.AbsInt(r.Int())
-	return mqs[i%len(mqs)]
-}
-
 func (s *namesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
 	var (
 		err       error
diff --git a/internal/validators.go b/internal/validators.go
index 7753942..e693fde 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -37,10 +37,6 @@
 		rlog.Fatal("consumerGroup is empty", nil)
 	}
 
-	//if !_Pattern.Match([]byte(group)) {
-	//	rlog.Fatalf("the specified group[%s] contains illegal characters, allowing only %s", group, _ValidPattern)
-	//}
-
 	if len(group) > _CharacterMaxLength {
 		rlog.Fatal("the specified group is longer than group max length 255.", nil)
 	}
diff --git a/producer/producer.go b/producer/producer.go
index 8e0661f..a47d76f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -87,6 +87,7 @@
 
 func (p *defaultProducer) Shutdown() error {
 	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+	p.client.UnregisterProducer(p.group)
 	p.client.Shutdown()
 	return nil
 }
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 387e9b0..e1273bb 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -51,6 +51,7 @@
 	assert.Nil(t, err)
 
 	client.EXPECT().Shutdown().Return()
+	client.EXPECT().UnregisterProducer(gomock.Any()).Return()
 	err = p.Shutdown()
 	assert.Nil(t, err)