Issue 201: Fix the flaky test TestBKDistributedLogManager.deleteDuringRead

This closes #204, #201 
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 4a49fdf..1d3fbce 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -707,6 +707,9 @@
     private boolean updateLogSegmentMetadata(SegmentReader reader,
                                              LogSegmentMetadata newMetadata) {
         if (reader.getSegment().getLogSegmentSequenceNumber() != newMetadata.getLogSegmentSequenceNumber()) {
+            logger.error("Inconsistent state found in entry reader for {} : "
+                + "current segment = {}, new segment = {}",
+                new Object[] { streamName, reader.getSegment(), newMetadata });
             setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
                     + streamName + " : current segment = " + reader.getSegment() + ", new segment = " + newMetadata));
             return false;
@@ -746,6 +749,9 @@
             }
         } else {
             if (currentSegmentSequenceNumber != segment.getLogSegmentSequenceNumber()) {
+                logger.error("Inconsistent state found in entry reader for {} : "
+                    + "current segment sn = {}, new segment sn = {}",
+                    new Object[] { streamName, currentSegmentSequenceNumber, segment.getLogSegmentSequenceNumber() });
                 setLastException(new DLIllegalStateException("Inconsistent state found in entry reader for "
                         + streamName + " : current segment sn = " + currentSegmentSequenceNumber
                         + ", new segment sn = " + segment.getLogSegmentSequenceNumber()));
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index c133bb8..c44df2f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -17,10 +17,13 @@
  */
 package org.apache.distributedlog;
 
-
 import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -47,6 +50,7 @@
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
 import org.apache.distributedlog.exceptions.BKTransmitException;
+import org.apache.distributedlog.exceptions.DLIllegalStateException;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
@@ -63,15 +67,12 @@
 import org.apache.distributedlog.metadata.MetadataUpdater;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.Utils;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * Test Cases for {@link DistributedLogManager}.
  */
@@ -765,7 +766,6 @@
         }
 
         LogReader reader = dlm.getInputStream(1);
-        long numTrans = 1;
         LogRecord record = reader.readNext(false);
         assert (null != record);
         DLMTestUtil.verifyLogRecord(record);
@@ -773,23 +773,20 @@
 
         dlm.delete();
 
-        boolean exceptionEncountered = false;
+        boolean exceptionEncountered;
         try {
             record = reader.readNext(false);
             while (null != record) {
                 DLMTestUtil.verifyLogRecord(record);
                 assert (lastTxId < record.getTransactionId());
                 lastTxId = record.getTransactionId();
-                numTrans++;
                 record = reader.readNext(false);
             }
             // make sure the exception is thrown from readahead
             while (true) {
                 reader.readNext(false);
             }
-        } catch (LogReadException readexc) {
-            exceptionEncountered = true;
-        } catch (LogNotFoundException exc) {
+        } catch (LogReadException | LogNotFoundException | DLIllegalStateException e) {
             exceptionEncountered = true;
         }
         assertTrue(exceptionEncountered);
@@ -1136,26 +1133,26 @@
         Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));
 
         BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-        Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
+        assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
         BKLogWriteHandler handler = writer.getCachedWriteHandler();
         List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
         for (LogSegmentMetadata segment: cachedSegments) {
             if (segment.getLastDLSN().compareTo(truncDLSN) < 0) {
-                Assert.assertTrue(segment.isTruncated());
-                Assert.assertTrue(!segment.isPartiallyTruncated());
+                assertTrue(segment.isTruncated());
+                assertTrue(!segment.isPartiallyTruncated());
             } else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) {
-                Assert.assertTrue(!segment.isTruncated());
-                Assert.assertTrue(segment.isPartiallyTruncated());
+                assertTrue(!segment.isTruncated());
+                assertTrue(segment.isPartiallyTruncated());
             } else {
-                Assert.assertTrue(!segment.isTruncated());
-                Assert.assertTrue(!segment.isPartiallyTruncated());
+                assertTrue(!segment.isTruncated());
+                assertTrue(!segment.isPartiallyTruncated());
             }
         }
 
         segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
                 LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName()));
 
-        Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
+        assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo())
                 .getMinActiveDLSN().compareTo(truncDLSN) == 0);
 
         {