[ISSUE #87] Ensure all producer partitions are closed. (#92)
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 5606f86..d806087 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -19,6 +19,8 @@
import (
"context"
+ "fmt"
+ "github.com/pkg/errors"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
@@ -144,14 +146,21 @@
func (p *producer) Flush() error {
for _, pp := range p.producers {
- return pp.Flush()
+ if err := pp.Flush(); err != nil {
+ return err
+ }
+
}
return nil
}
func (p *producer) Close() error {
+ var errs error
for _, pp := range p.producers {
- return pp.Close()
+ if err := pp.Close(); err != nil {
+ errs = errors.Wrap(err, fmt.Sprintf("unable to close producer %s", p.Name()))
+ }
+
}
- return nil
+ return errs
}