Merge pull request #560 from beiwei30/subscribe-after-start
allow further subscription even after client starts, to align with Java SDK's behavior
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58945c2..931fefa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -222,9 +222,11 @@
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
- if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
- return errors.New("subscribe topic only started before")
+ if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+ atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+ return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
}
+
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}