Merge pull request #712 from superhx/fix/sub_retry_topic
(fix): fix push_consumer#unsubscribe unsubscribe retry
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e3e0d32..08a1048 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -123,6 +123,11 @@
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
+ if p.model == Clustering {
+ retryTopic := internal.GetRetryTopic(p.consumerGroup)
+ sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
+ p.subscriptionDataTable.Store(retryTopic, sub)
+ }
return p, nil
}
@@ -231,14 +236,6 @@
return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
}
- // add retry topic subscription for resubscribe
- retryTopic := internal.GetRetryTopic(pc.consumerGroup)
- _, exists := pc.subscriptionDataTable.Load(retryTopic)
- if !exists {
- sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
- pc.subscriptionDataTable.Store(retryTopic, sub)
- }
-
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
@@ -258,8 +255,6 @@
topic = pc.option.Namespace + "%" + topic
}
pc.subscriptionDataTable.Delete(topic)
- retryTopic := internal.GetRetryTopic(pc.consumerGroup)
- pc.subscriptionDataTable.Delete(retryTopic)
return nil
}