SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE caused by unhandled previous exceptions (#1392)

* SAMZA-2556:AzureBlob SystemProducer: Fix NPE thrown during flush. NPE caused by unhandled previous exceptions

* address comments

* fix checkstyle build failure
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
index 85c2b33..8798787 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.java
@@ -195,7 +195,9 @@
   @Override
   public void flush() throws IOException {
     synchronized (currentDataFileWriterLock) {
-      currentBlobWriterComponents.dataFileWriter.flush();
+      if (!isClosed && currentBlobWriterComponents != null) {
+        currentBlobWriterComponents.dataFileWriter.flush();
+      }
     }
   }
 
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
index 51d2cdc..f14b82b 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobAvroWriter.java
@@ -245,6 +245,18 @@
   }
 
   @Test
+  public void testNPEinFlush() throws Exception {
+    // do not provide the dataFileWrite, azureBloboutputstream and blockblob client -- to force creation during first write
+    azureBlobAvroWriter =
+        spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
+            60000, "test", null, null, null,
+            blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
+            Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
+    when(azureBlobAvroWriter.encodeRecord((IndexedRecord) ome.getMessage())).thenThrow(IllegalStateException.class);
+    azureBlobAvroWriter.flush(); // No NPE because has null check for currentBlobWriterComponents
+  }
+
+  @Test
   public void testMaxBlobSizeExceeded() throws Exception {
     String blobUrlPrefix = "test";
     String blobNameRegex = "test/[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}/[0-9]{2}-[0-9]{2}-.{8}.avro.gz";