SAMZA-2526: Azure blob system producer: do not commit blobs if avro DataFileWriter.close fails (#1362)
API changes: Behavior change: Blob not committed if DataFileWriter.close fails discarding all uploaded blocks.
Upgrade Instructions: Catch exceptions arising out of AzureBlobSystemProducer.flush and retry messages since previous flush. Do not advance checkpoint if flush fails.
Usage Instructions: None
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 6e74461..ba8e3aa 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -284,10 +284,9 @@
// dataFileWriter.close calls close of the azureBlobOutputStream associated with it.
dataFileWriter.close();
} catch (Exception e) {
- // ensure that close is called even if dataFileWriter.close fails.
- // This is to avoid loss of all the blocks uploaded for the blob
- // as commitBlockList happens in close of azureBlobOutputStream.
- azureBlobOutputStream.close();
+ LOG.error("Exception occurred during DataFileWriter.close for blob "
+ + blockBlobAsyncClient.getBlobUrl()
+ + ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
throw e;
}
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 62076aa..f52b484 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -63,6 +63,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -217,7 +218,7 @@
azureBlobAvroWriter.flush();
azureBlobAvroWriter.close();
- verify(mockAzureBlobOutputStream).close();
+ verify(mockAzureBlobOutputStream, never()).close();
}
@Test(expected = RuntimeException.class)