Fix the flaky test TestBKDistributedLogManager.deleteDuringRead
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 4a49fdf..1d3fbce 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -707,6 +707,9 @@
private boolean updateLogSegmentMetadata(SegmentReader reader,
LogSegmentMetadata newMetadata) {
if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+ logger.error("Inconsistent state found in entry reader for {} : "
+ + "current segment = {}, new segment = {}",
+ new Object[] { streamName, reader.getSegment(), newMetadata });
setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
return false;
@@ -746,6 +749,9 @@
}
} else {
if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+ logger.error("Inconsistent state found in entry reader for {} : "
+ + "current segment sn = {}, new segment sn = {}",
+ new Object[] { streamName, currentSegmentSequenceNumber, segment.getLogSegmentSequenceNumber() });
setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
+ streamName + " : current segment sn = " + currentSegmentSequenceNumber
+ ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index c133bb8..c44df2f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -17,10 +17,13 @@
*/
package org.apache.distributedlog;
-
import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
@@ -47,6 +50,7 @@
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LogEmptyException;
@@ -63,15 +67,12 @@
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-
/**
* Test Cases for {@link DistributedLogManager}.
*/
@@ -765,7 +766,6 @@
}
LogReader reader = dlm.getInputStream(1);
- long numTrans = 1;
LogRecord record = reader.readNext(false);
assert (null != record);
DLMTestUtil.verifyLogRecord(record);
@@ -773,23 +773,20 @@
dlm.delete();
- boolean exceptionEncountered = false;
+ boolean exceptionEncountered;
try {
record = reader.readNext(false);
while (null != record) {
DLMTestUtil.verifyLogRecord(record);
assert (lastTxId < record.getTransactionId());
lastTxId = record.getTransactionId();
- numTrans++;
record = reader.readNext(false);
}
// make sure the exception is thrown from readahead
while (true) {
reader.readNext(false);
}
- } catch (LogReadException readexc) {
- exceptionEncountered = true;
- } catch (LogNotFoundException exc) {
+ } catch (LogReadException | LogNotFoundException | DLIllegalStateException e) {
exceptionEncountered = true;
}
assertTrue(exceptionEncountered);
@@ -1136,26 +1133,26 @@
Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
+ assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
BKLogWriteHandler handler = writer.getCachedWriteHandler();
List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
for (LogSegmentMetadata segment: cachedSegments) {
if (segment.getLastDLSN().compareTo(truncDLSN) < 0) {
- Assert.assertTrue(segment.isTruncated());
- Assert.assertTrue(!segment.isPartiallyTruncated());
+ assertTrue(segment.isTruncated());
+ assertTrue(!segment.isPartiallyTruncated());
} else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) {
- Assert.assertTrue(!segment.isTruncated());
- Assert.assertTrue(segment.isPartiallyTruncated());
+ assertTrue(!segment.isTruncated());
+ assertTrue(segment.isPartiallyTruncated());
} else {
- Assert.assertTrue(!segment.isTruncated());
- Assert.assertTrue(!segment.isPartiallyTruncated());
+ assertTrue(!segment.isTruncated());
+ assertTrue(!segment.isPartiallyTruncated());
}
}
segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName()));
- Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
+ assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
.getMinActiveDLSN().compareTo(truncDLSN) == 0);
{