fix: BKAsyncLogWriter swallows rootcause of the WriteException

Descriptions of the changes in this PR:

Added first exception as a rootcause for the WriteException

### Motivation

simplify troubleshooting

### Changes

Added first exception as a rootcause for the WriteException

Master Issue: #2574

Reviewers: Enrico Olivelli <eolivelli@gmail.com>

This closes #2575 from dlg99/master-alw-exception

(cherry picked from commit 2a6f718140eb01ae426856361dbbdcf80d4755b3)
Signed-off-by: Enrico Olivelli <eolivelli@apache.org>
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index 18e6757..780107e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -25,6 +25,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -91,6 +92,7 @@
         @Override
         public void onFailure(Throwable cause) {
             promise.completeExceptionally(cause);
+            firstEncounteredError.compareAndSet(null, cause);
             encounteredError = true;
         }
     }
@@ -126,6 +128,7 @@
     private final boolean disableRollOnSegmentError;
     private LinkedList<PendingLogRecord> pendingRequests = null;
     private volatile boolean encounteredError = false;
+    private final AtomicReference<Throwable> firstEncounteredError = new AtomicReference<>(null);
     private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
     private long lastTxId = DistributedLogConstants.INVALID_TXID;
 
@@ -187,7 +190,7 @@
     private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException {
         if (encounteredError) {
             throw new WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error.");
+                    "writer has been closed due to error.", firstEncounteredError.get());
         }
         BKLogSegmentWriter segmentWriter = getCachedLogWriter();
         if (null != segmentWriter
@@ -216,7 +219,7 @@
                                                              final boolean allowMaxTxID) {
         if (encounteredError) {
             return FutureUtils.exception(new WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error."));
+                    "writer has been closed due to error.", firstEncounteredError.get()));
         }
         CompletableFuture<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError);
         if (null == writerFuture) {
@@ -383,6 +386,7 @@
         final List<PendingLogRecord> pendingRequestsSnapshot;
         synchronized (this) {
             pendingRequestsSnapshot = pendingRequests;
+            firstEncounteredError.compareAndSet(null, cause);
             encounteredError = errorOutWriter;
             pendingRequests = null;
             if (null != rollingFuture) {
@@ -517,7 +521,7 @@
                 for (PendingLogRecord pendingLogRecord : pendingRequests) {
                     pendingLogRecord.promise
                             .completeExceptionally(new WriteException(bkDistributedLogManager.getStreamName(),
-                            "abort wring: writer has been closed due to error."));
+                            "abort writing: writer has been closed due to error."));
                 }
             }
         }
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
index 1d9c2a9..549fd41 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
@@ -28,4 +28,9 @@
         super(StatusCode.WRITE_EXCEPTION,
             "Write rejected because stream " + stream + " has encountered an error : " + transmitError);
     }
+
+    public WriteException(String stream, String transmitError, Throwable cause) {
+        super(StatusCode.WRITE_EXCEPTION,
+                "Write rejected because stream " + stream + " has encountered an error : " + transmitError, cause);
+    }
 }