[ISSUE #371]feat: clean extra code for internal packagec (#372)
* feat: clean extra code for internal packagec
Closes #371
* fix
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1add8dd..0c7f224 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -212,6 +212,7 @@
pc.lockTicker.Stop()
close(pc.done)
+ pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})
diff --git a/internal/client.go b/internal/client.go
index acc25ea..ca8cc88 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -131,6 +131,7 @@
ClientID() string
RegisterProducer(group string, producer InnerProducer)
+ UnregisterProducer(group string)
InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error)
InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
@@ -146,7 +147,6 @@
RegisterConsumer(group string, consumer InnerConsumer) error
UnregisterConsumer(group string)
PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
- PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error
RebalanceImmediately()
UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
}
@@ -600,11 +600,6 @@
}
}
-// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
- return nil
-}
-
func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
_, exist := c.consumerMap.Load(group)
if exist {
@@ -618,6 +613,7 @@
}
func (c *rmqClient) UnregisterConsumer(group string) {
+ c.consumerMap.Delete(group)
}
func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
@@ -625,14 +621,7 @@
}
func (c *rmqClient) UnregisterProducer(group string) {
-}
-
-func (c *rmqClient) SelectProducer(group string) InnerProducer {
- return nil
-}
-
-func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
- return nil
+ c.producerMap.Delete(group)
}
func (c *rmqClient) RebalanceImmediately() {
diff --git a/internal/mock_client.go b/internal/mock_client.go
index d01a51f..d8ed7b2 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -14,6 +14,7 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+
// Code generated by MockGen. DO NOT EDIT.
// Source: client.go
@@ -268,6 +269,16 @@
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), group, producer)
}
+// UnregisterProducer mocks base method
+func (m *MockRMQClient) UnregisterProducer(group string) {
+ m.ctrl.Call(m, "UnregisterProducer", group)
+}
+
+// UnregisterProducer indicates an expected call of UnregisterProducer
+func (mr *MockRMQClientMockRecorder) UnregisterProducer(group interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterProducer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterProducer), group)
+}
+
// InvokeSync mocks base method
func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
@@ -387,18 +398,6 @@
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
}
-// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error {
- ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
- ret0, _ := ret[0].(error)
- return ret0
-}
-
-// PullMessageAsync indicates an expected call of PullMessageAsync
-func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), ctx, brokerAddrs, request, f)
-}
-
// RebalanceImmediately mocks base method
func (m *MockRMQClient) RebalanceImmediately() {
m.ctrl.Call(m, "RebalanceImmediately")
diff --git a/internal/route.go b/internal/route.go
index f4d4116..c5e771e 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -113,7 +113,6 @@
}
func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
- // Todo process lock timeout
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
@@ -258,16 +257,6 @@
return mqs, nil
}
-func (s *namesrvs) FindMQByTopic(topic string) *primitive.MessageQueue {
- mqs, err := s.FetchPublishMessageQueues(topic)
- if err != nil {
- return nil
- }
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- i := utils.AbsInt(r.Int())
- return mqs[i%len(mqs)]
-}
-
func (s *namesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
var (
err error
diff --git a/internal/validators.go b/internal/validators.go
index 7753942..e693fde 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -37,10 +37,6 @@
rlog.Fatal("consumerGroup is empty", nil)
}
- //if !_Pattern.Match([]byte(group)) {
- // rlog.Fatalf("the specified group[%s] contains illegal characters, allowing only %s", group, _ValidPattern)
- //}
-
if len(group) > _CharacterMaxLength {
rlog.Fatal("the specified group is longer than group max length 255.", nil)
}
diff --git a/producer/producer.go b/producer/producer.go
index 8e0661f..a47d76f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -87,6 +87,7 @@
func (p *defaultProducer) Shutdown() error {
atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+ p.client.UnregisterProducer(p.group)
p.client.Shutdown()
return nil
}
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 387e9b0..e1273bb 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -51,6 +51,7 @@
assert.Nil(t, err)
client.EXPECT().Shutdown().Return()
+ client.EXPECT().UnregisterProducer(gomock.Any()).Return()
err = p.Shutdown()
assert.Nil(t, err)