SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)

API changes: Behavior change: Blob not committed if DataFileWriter.close fails discarding all uploaded blocks.
Upgrade Instructions: Catch exceptions arising out of AzureBlobSystemProducer.flush and retry messages since previous flush. Do not advance checkpoint if flush fails.
Usage Instructions: None
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 6e74461..ba8e3aa 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -284,10 +284,9 @@
       // dataFileWriter.close calls close of the azureBlobOutputStream associated with it.
       dataFileWriter.close();
     } catch (Exception e) {
-      // ensure that close is called even if dataFileWriter.close fails.
-      // This is to avoid loss of all the blocks uploaded for the blob
-      // as commitBlockList happens in close of azureBlobOutputStream.
-      azureBlobOutputStream.close();
+      LOG.error("Exception occurred during DataFileWriter.close for blob  "
+          + blockBlobAsyncClient.getBlobUrl()
+          + ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
       throw e;
     }
   }
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 62076aa..f52b484 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -63,6 +63,7 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -217,7 +218,7 @@
 
     azureBlobAvroWriter.flush();
     azureBlobAvroWriter.close();
-    verify(mockAzureBlobOutputStream).close();
+    verify(mockAzureBlobOutputStream, never()).close();
   }
 
   @Test(expected = RuntimeException.class)