[ISSUE #87] Ensure all producer partitions are closed. (#92)

diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 5606f86..d806087 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -19,6 +19,8 @@
 
 import (
 	"context"
+	"fmt"
+	"github.com/pkg/errors"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
@@ -144,14 +146,21 @@
 
 func (p *producer) Flush() error {
 	for _, pp := range p.producers {
-		return pp.Flush()
+		 if err :=  pp.Flush(); err != nil {
+		 	return err
+		 }
+
 	}
 	return nil
 }
 
 func (p *producer) Close() error {
+	var errs error
 	for _, pp := range p.producers {
-		return pp.Close()
+		if err := pp.Close(); err != nil {
+			errs = errors.Wrap(err, fmt.Sprintf("unable to close producer %s", p.Name()))
+		}
+
 	}
-	return nil
+	return errs
 }