SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE caused by unhandled previous exceptions (#1392)
* SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE caused by unhandled previous exceptions
* address comments
* fix checkstyle build failure
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 85c2b33..8798787 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -195,7 +195,9 @@
@Override
public void flush() throws IOException {
synchronized (currentDataFileWriterLock) {
- currentBlobWriterComponents.dataFileWriter.flush();
+ if (!isClosed && currentBlobWriterComponents != null) {
+ currentBlobWriterComponents.dataFileWriter.flush();
+ }
}
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 51d2cdc..f14b82b 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -245,6 +245,18 @@
}
@Test
+ public void testNPEinFlush() throws Exception {
+ // do not provide the dataFileWrite, azureBloboutputstream and blockblob client -- to force creation during first write
+ azureBlobAvroWriter =
+ spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
+ 60000, "test", null, null, null,
+ blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+ Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+ when(azureBlobAvroWriter.encodeRecord((IndexedRecord) ome.getMessage())).thenThrow(IllegalStateException.class);
+ azureBlobAvroWriter.flush(); // No NPE because has null check for currentBlobWriterComponents
+ }
+
+ @Test
public void testMaxBlobSizeExceeded() throws Exception {
String blobUrlPrefix = "test";
String blobNameRegex = "test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz";