AzureBlobSystemProducer: Catch all Exception in completableFuture during flush of producer (#1363)
* AzureBlobSystemProducer: Catch all throwable in completableFuture during flush of the producer
* catch Exception instead of throwable
* Empty commit to Trigger Travis Build
* Empty commit to Trigger Travis Build again
* fix checkstyle build failure
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index 5ecd528..d89f38f 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -486,15 +486,15 @@
sourceWriterMap.forEach((stream, writer) -> {
LOG.info("Closing topic:{}", stream);
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
- @Override
- public void run() {
- try {
- writer.close();
- } catch (IOException e) {
- throw new SystemProducerException("Close failed for topic " + stream, e);
+ @Override
+ public void run() {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new SystemProducerException("Close failed for topic " + stream, e);
+ }
}
- }
- }, asyncBlobThreadPool);
+ }, asyncBlobThreadPool);
pendingClose.add(future);
future.handle((aVoid, throwable) -> {
sourceWriterMap.remove(writer);