fix: BKAsyncLogWriter swallows rootcause of the WriteException
Descriptions of the changes in this PR:
Added first exception as a rootcause for the WriteException
### Motivation
simplify troubleshooting
### Changes
Added first exception as a rootcause for the WriteException
Master Issue: #2574
Reviewers: Enrico Olivelli <eolivelli@gmail.com>
This closes #2575 from dlg99/master-alw-exception
(cherry picked from commit 2a6f718140eb01ae426856361dbbdcf80d4755b3)
Signed-off-by: Enrico Olivelli <eolivelli@apache.org>
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index 18e6757..780107e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -25,6 +25,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -91,6 +92,7 @@
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
+ firstEncounteredError.compareAndSet(null, cause);
encounteredError = true;
}
}
@@ -126,6 +128,7 @@
private final boolean disableRollOnSegmentError;
private LinkedList<PendingLogRecord> pendingRequests = null;
private volatile boolean encounteredError = false;
+ private final AtomicReference<Throwable> firstEncounteredError = new AtomicReference<>(null);
private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
private long lastTxId = DistributedLogConstants.INVALID_TXID;
@@ -187,7 +190,7 @@
private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException {
if (encounteredError) {
throw new WriteException(bkDistributedLogManager.getStreamName(),
- "writer has been closed due to error.");
+ "writer has been closed due to error.", firstEncounteredError.get());
}
BKLogSegmentWriter segmentWriter = getCachedLogWriter();
if (null != segmentWriter
@@ -216,7 +219,7 @@
final boolean allowMaxTxID) {
if (encounteredError) {
return FutureUtils.exception(new WriteException(bkDistributedLogManager.getStreamName(),
- "writer has been closed due to error."));
+ "writer has been closed due to error.", firstEncounteredError.get()));
}
CompletableFuture<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError);
if (null == writerFuture) {
@@ -383,6 +386,7 @@
final List<PendingLogRecord> pendingRequestsSnapshot;
synchronized (this) {
pendingRequestsSnapshot = pendingRequests;
+ firstEncounteredError.compareAndSet(null, cause);
encounteredError = errorOutWriter;
pendingRequests = null;
if (null != rollingFuture) {
@@ -517,7 +521,7 @@
for (PendingLogRecord pendingLogRecord : pendingRequests) {
pendingLogRecord.promise
.completeExceptionally(new WriteException(bkDistributedLogManager.getStreamName(),
- "abort wring: writer has been closed due to error."));
+ "abort writing: writer has been closed due to error."));
}
}
}
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
index 1d9c2a9..549fd41 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
@@ -28,4 +28,9 @@
super(StatusCode.WRITE_EXCEPTION,
"Write rejected because stream " + stream + " has encountered an error : " + transmitError);
}
+
+ public WriteException(String stream, String transmitError, Throwable cause) {
+ super(StatusCode.WRITE_EXCEPTION,
+ "Write rejected because stream " + stream + " has encountered an error : " + transmitError, cause);
+ }
}