AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)

* AzureBlobSystemProducer: Catch all throwable in completableFuture during flush of the producer

* catch Exception instead of throwable

* Empty commit to Trigger Travis Build

* Empty commit to Trigger Travis Build again

* fix checkstyle build failure
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 5ecd528..d89f38f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -486,15 +486,15 @@
       sourceWriterMap.forEach((stream, writer) -> {
         LOG.info("Closing topic:{}", stream);
         CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              writer.close();
-            } catch (IOException e) {
-              throw new SystemProducerException("Close failed for topic " + stream, e);
+            @Override
+            public void run() {
+              try {
+                writer.close();
+              } catch (Exception e) {
+                throw new SystemProducerException("Close failed for topic " + stream, e);
+              }
             }
-          }
-        }, asyncBlobThreadPool);
+          }, asyncBlobThreadPool);
         pendingClose.add(future);
         future.handle((aVoid, throwable) -> {
           sourceWriterMap.remove(writer);