[issue 587] fail all pending requests when occurs ServerError_TopicTerminated… (#588)
* fix:fail all pending requests when occurs ServerError_TopicTerminatedError
* feat:reform method to use failPendingRequests
* fix:only callback the requestId
* chore:fix for cr
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 147de3f..875c84e 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -367,12 +367,7 @@
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue
// https://github.com/apache/pulsar-client-go/issues/239
- c.pendingLock.Lock()
- for id, req := range c.pendingReqs {
- req.callback(nil, errConnectionClosed)
- delete(c.pendingReqs, id)
- }
- c.pendingLock.Unlock()
+ c.failPendingRequests(errConnectionClosed)
c.Close()
}()
@@ -697,6 +692,16 @@
return request, ok
}
+func (c *connection) failPendingRequests(err error) bool {
+ c.pendingLock.Lock()
+ defer c.pendingLock.Unlock()
+ for id, req := range c.pendingReqs {
+ req.callback(nil, err)
+ delete(c.pendingReqs, id)
+ }
+ return true
+}
+
func (c *connection) lastDataReceived() time.Time {
c.lastDataReceivedLock.Lock()
defer c.lastDataReceivedLock.Unlock()
@@ -764,7 +769,14 @@
errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
request.callback(nil, errors.New(errMsg))
case pb.ServerError_TopicTerminatedError:
- // TODO: no-op
+ request, ok := c.deletePendingRequest(requestID)
+ if !ok {
+ c.log.Warnf("Received unexpected error response for request %d of type %s",
+ requestID, cmdError.GetError())
+ return
+ }
+ errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
default:
// By default, for transient error, let the reconnection logic
// to take place and re-establish the produce again