Add fencing to recovery reads to avoid data loss issue

Descriptions of the changes in this PR:

### Motivation

Adding the fencing flag to recovery reads avoids a data loss scenario as described in [issue 2614](https://github.com/apache/bookkeeper/issues/2614)

### Changes

Added the fencing flag to recovery reads. Refactored some mocking unit test code to introduce fencing and allow two writers to share the same bookie state. Added a new unit to verify the fix. You can recreate the initial problem by removing the fencing flag from recovery reads and running the new unit test.

Master Issue: #2614

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Andrey Yegorov <andrey.yegorov@datastax.com>

This closes #2616 from Vanlightly/fix-fencing

(cherry picked from commit 017307bc67431a7616861ad09927c4e3327633d1)
Signed-off-by: Enrico Olivelli <eolivelli@apache.org>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e6c011e..b48761e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -568,9 +568,14 @@
             lh.throttler.acquire();
         }
 
-        int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY : BookieProtocol.FLAG_NONE;
-        clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
-                                              this, new ReadContext(bookieIndex, to, entry), flags);
+        if (isRecoveryRead) {
+            int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING;
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey);
+        } else {
+            clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+                    this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE);
+        }
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
index af6d980..38e4879 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -21,16 +21,23 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.collect.Lists;
+
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.proto.MockBookies;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.junit.Assert;
 import org.junit.Test;
@@ -86,7 +93,7 @@
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
 
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> FutureUtils.exception(new BKException.BKWriteException()));
 
@@ -110,7 +117,7 @@
 
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
         // will block recovery at the writeback phase
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
@@ -149,7 +156,7 @@
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
         CompletableFuture<Void> failing = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
         // will block recovery at the writeback phase
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
@@ -195,7 +202,7 @@
         CompletableFuture<Void> writingBack = new CompletableFuture<>();
         CompletableFuture<Void> blocker = new CompletableFuture<>();
         CompletableFuture<Void> failing = new CompletableFuture<>();
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
         clientCtx.getMockBookieClient().errorBookies(b2);
 
         ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
@@ -216,8 +223,8 @@
         clientCtx.getMockRegistrationClient().addBookies(b4).get();
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
-        clientCtx.getMockBookieClient().seedEntries(b3, 1L, 1L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b3, 1L, 1L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
                     if (bookie.equals(b2) && entryId == 1L) {
@@ -248,7 +255,7 @@
         clientCtx.getMockRegistrationClient().addBookies(b4, b5).get();
 
         Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
-        clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
         clientCtx.getMockBookieClient().setPreWriteHook(
                 (bookie, ledgerId, entryId) -> {
                     if (bookie.equals(b1) || bookie.equals(b2)) {
@@ -271,4 +278,217 @@
         Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).contains(b5));
         Assert.assertEquals(lh.getLastAddConfirmed(), 0L);
     }
+
+    /**
+     * This test verifies the fix for the data loss scenario found by the TLA+ specfication, specifically
+     * the invariant violation that metadata and writer can diverge. The scenario is that the original writer
+     * can commit an entry e that will later be lost because a second writer can close the ledger at e-1.
+     * The cause is that fencing was originally only performed on LAC reads which is not enough to prevent
+     * the 1st writer from reaching Ack Quorum after the 2nd writer has closed the ledger. The fix has
+     * been to fence on recovery reads also.
+     */
+    @Test
+    public void testFirstWriterCannotCommitWriteAfter2ndWriterCloses() throws Exception {
+        /*
+            This test uses CompletableFutures to control the sequence of actions performed by
+            two writers. There are different sets of futures:
+             - block*: These futures block the various reads, writes and metadata updates until the
+                       test thread is ready for them to be executed. Thus ensuring the right sequence
+                       of events occur.
+             - reachedStepN: These futures block in the test thread to ensure that we only unblock
+                             an action when the prior one has been executed and we are already blocked
+                             on the next actionin the sequence.
+         */
+
+        //  Setup w1
+        CompletableFuture<Void> reachedStep1 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep2 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep3 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep4 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep5 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep6 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep7 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep8 = new CompletableFuture<>();
+        CompletableFuture<Void> reachedStep9 = new CompletableFuture<>();
+
+        MockBookies mockBookies = new MockBookies();
+        MockClientContext clientCtx1 = MockClientContext.create(mockBookies);
+        Versioned<LedgerMetadata> md1 = setupLedger(clientCtx1, 1, Lists.newArrayList(b1, b2, b3));
+
+        CompletableFuture<Void> blockB1Write = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2Write = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3Write = new CompletableFuture<>();
+        clientCtx1.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    // ignore seed entries e0 and e1
+                    if (entryId < 2) {
+                        return FutureUtils.value(null);
+                    }
+
+                    if (!reachedStep1.isDone()) {
+                        reachedStep1.complete(null);
+                    }
+
+                    if (bookie.equals(b1)) {
+                        return blockB1Write;
+                    } else if (bookie.equals(b2)) {
+                        reachedStep9.complete(null);
+                        return blockB2Write;
+                    } else if (bookie.equals(b3)) {
+                        reachedStep3.complete(null);
+                        return blockB3Write;
+                    }  else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        LedgerHandle w1 = new LedgerHandle(clientCtx1, 1, md1,
+                BookKeeper.DigestType.CRC32C,
+                ClientUtil.PASSWD, WriteFlag.NONE);
+        w1.addEntry("e0".getBytes(StandardCharsets.UTF_8));
+        w1.addEntry("e1".getBytes(StandardCharsets.UTF_8));
+
+        //  Setup w2
+        MockClientContext clientCtx2 = MockClientContext.create(mockBookies);
+        Versioned<LedgerMetadata> md2 = setupLedger(clientCtx2, 1, Lists.newArrayList(b1, b2, b3));
+
+        CompletableFuture<Void> blockB1ReadLac = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2ReadLac = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3ReadLac = new CompletableFuture<>();
+
+        CompletableFuture<Void> blockB1ReadEntry0 = new CompletableFuture<>();
+        CompletableFuture<Void> blockB2ReadEntry0 = new CompletableFuture<>();
+        CompletableFuture<Void> blockB3ReadEntry0 = new CompletableFuture<>();
+
+        AtomicBoolean isB1LacRead = new AtomicBoolean(true);
+        AtomicBoolean isB2LacRead = new AtomicBoolean(true);
+        AtomicBoolean isB3LacRead = new AtomicBoolean(true);
+
+        clientCtx2.getMockBookieClient().setPreReadHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b1)) {
+                        if (isB1LacRead.get()) {
+                            isB1LacRead.set(false);
+                            reachedStep2.complete(null);
+                            return blockB1ReadLac;
+                        } else {
+                            reachedStep6.complete(null);
+                            return blockB1ReadEntry0;
+                        }
+                    } else if (bookie.equals(b2)) {
+                        if (isB2LacRead.get()) {
+                            try {
+                                isB2LacRead.set(false);
+                                reachedStep4.complete(null);
+                                blockB2ReadLac.get(); // block this read - it does not succeed
+                            } catch (Throwable t){}
+                            return FutureUtils.exception(new BKException.BKWriteException());
+                        } else {
+                            reachedStep7.complete(null);
+                            return blockB2ReadEntry0;
+                        }
+                    } else if (bookie.equals(b3)) {
+                        if (isB3LacRead.get()) {
+                            isB3LacRead.set(false);
+                            reachedStep5.complete(null);
+                            return blockB3ReadLac;
+                        } else {
+                            return blockB3ReadEntry0;
+                        }
+                    }  else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        AtomicInteger w2MetaUpdates = new AtomicInteger(0);
+        CompletableFuture<Void> blockW2StartingRecovery = new CompletableFuture<>();
+        CompletableFuture<Void> blockW2ClosingLedger = new CompletableFuture<>();
+        clientCtx2.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
+            if (w2MetaUpdates.get() == 0) {
+                w2MetaUpdates.incrementAndGet();
+                return blockW2StartingRecovery;
+            } else {
+                reachedStep8.complete(null);
+                return blockW2ClosingLedger;
+            }
+        });
+
+        ReadOnlyLedgerHandle w2 = new ReadOnlyLedgerHandle(
+                clientCtx2, 1L, md2, BookKeeper.DigestType.CRC32C, PASSWD, false);
+
+        // Start an async add entry, blocked for now.
+        CompletableFuture<Object> w1WriteFuture = new CompletableFuture<>();
+        AtomicInteger writeResult = new AtomicInteger(0);
+        w1.asyncAddEntry("e2".getBytes(), (int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
+            if (rc == BKException.Code.OK) {
+                writeResult.set(1);
+            } else {
+                writeResult.set(2);
+            }
+            SyncCallbackUtils.finish(rc, null, w1WriteFuture);
+        }, null);
+
+        // Step 1. w2 starts recovery
+        stepBlock(reachedStep1);
+        GenericCallbackFuture<Void> recoveryPromise = new GenericCallbackFuture<>();
+        w2.recover(recoveryPromise, null, false);
+        blockW2StartingRecovery.complete(null);
+
+        // Step 2. w2 fencing read LAC reaches B1
+        stepBlock(reachedStep2);
+        blockB1ReadLac.complete(null);
+
+        // Step 3. w1 add e0 reaches B3
+        stepBlock(reachedStep3);
+        blockB3Write.complete(null);
+
+        // Step 4. w2 fencing LAC read does not reach B2 or it fails
+        stepBlock(reachedStep4);
+        blockB2ReadLac.complete(null);
+
+        // Step 5. w2 fencing LAC read reaches B3
+        stepBlock(reachedStep5);
+        blockB3ReadLac.complete(null);
+
+        // Step 6. w2 sends read e0 to b1, gets NoSuchLedger
+        stepBlock(reachedStep6);
+        blockB1ReadEntry0.complete(null);
+
+        // Step 7. w2 send read e0 to b2, gets NoSuchLedger
+        stepBlock(reachedStep7);
+        blockB2ReadEntry0.complete(null);
+
+        // Step 8. w2 closes ledger because (Qw-Qa)+1 bookies confirmed they do not have it
+        // last entry id set to 0
+        stepBlock(reachedStep8);
+        blockW2ClosingLedger.complete(null);
+
+        // Step 9. w1 add e0 reaches b2 (which was fenced by a recovery read)
+        stepBlock(reachedStep9);
+        blockB2Write.complete(null);
+
+        // Step 10. w1 write fails to reach AckQuorum
+        try {
+            w1WriteFuture.get(200, TimeUnit.MILLISECONDS);
+            Assert.fail("The write to b2 should have failed as it was fenced by the recovery read of step 7");
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof BKException.BKLedgerFencedException);
+        }
+
+        // w1 received negative acknowledgement of e2 being written
+        Assert.assertEquals(1, w1.getLedgerMetadata().getAllEnsembles().size());
+        Assert.assertEquals(2, writeResult.get());
+        Assert.assertEquals(1L, w1.getLastAddConfirmed());
+
+        // w2 closed the ledger with only the original entries, not the third one
+        // i.e there is no divergence between w1m, w2 and metadata
+        Assert.assertEquals(1, w2.getLedgerMetadata().getAllEnsembles().size());
+        Assert.assertEquals(1L, w2.getLastAddConfirmed());
+    }
+
+    private void stepBlock(CompletableFuture<Void> reachedStepFuture) {
+        try {
+            reachedStepFuture.get();
+        } catch (Exception e) {}
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index 68d0502..ae3475e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.meta.MockLedgerManager;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.MockBookieClient;
+import org.apache.bookkeeper.proto.MockBookies;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.mockito.Mockito;
 
@@ -55,7 +56,7 @@
     private MockRegistrationClient regClient;
     private ByteBufAllocator allocator;
 
-    static MockClientContext create() throws Exception {
+    static MockClientContext create(MockBookies mockBookies) throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().name("mock-executor").numThreads(1).build();
         MockRegistrationClient regClient = new MockRegistrationClient();
@@ -67,17 +68,22 @@
         bookieWatcherImpl.initialBlockingBookieRead();
 
         return new MockClientContext()
-            .setConf(ClientInternalConf.fromConfig(conf))
-            .setLedgerManager(new MockLedgerManager())
-            .setBookieWatcher(bookieWatcherImpl)
-            .setPlacementPolicy(placementPolicy)
-            .setRegistrationClient(regClient)
-            .setBookieClient(new MockBookieClient(scheduler))
-            .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
-            .setMainWorkerPool(scheduler)
-            .setScheduler(scheduler)
-            .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
-            .setIsClientClosed(() -> false);
+                .setConf(ClientInternalConf.fromConfig(conf))
+                .setLedgerManager(new MockLedgerManager())
+                .setBookieWatcher(bookieWatcherImpl)
+                .setPlacementPolicy(placementPolicy)
+                .setRegistrationClient(regClient)
+                .setBookieClient(new MockBookieClient(scheduler, mockBookies))
+                .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
+                .setMainWorkerPool(scheduler)
+                .setScheduler(scheduler)
+                .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
+                .setIsClientClosed(() -> false);
+    }
+
+    static MockClientContext create() throws Exception {
+        MockBookies mockBookies = new MockBookies();
+        return create(mockBookies);
     }
 
     static MockClientContext copyOf(ClientContext other) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index fa3d1ba..f655328 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -20,23 +20,20 @@
  */
 package org.apache.bookkeeper.proto;
 
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
 import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -49,14 +46,13 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Mock implementation of BookieClient.
  */
@@ -64,9 +60,9 @@
     static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
 
     final OrderedExecutor executor;
-    final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, LedgerData>> data = new ConcurrentHashMap<>();
+    final MockBookies mockBookies;
     final Set<BookieId> errorBookies =
-        Collections.newSetFromMap(new ConcurrentHashMap<BookieId, Boolean>());
+            Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     /**
      * Runs before or after an operation. Can stall the operation or error it.
@@ -82,6 +78,13 @@
 
     public MockBookieClient(OrderedExecutor executor) {
         this.executor = executor;
+        this.mockBookies = new MockBookies();
+    }
+
+    public MockBookieClient(OrderedExecutor executor,
+                            MockBookies mockBookies) {
+        this.executor = executor;
+        this.mockBookies = mockBookies;
     }
 
     public void setPreReadHook(Hook hook) {
@@ -110,14 +113,12 @@
         }
     }
 
-    public void seedEntries(BookieId bookie, long ledgerId, long entryId, long lac) throws Exception {
-        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C,
-                UnpooledByteBufAllocator.DEFAULT, false);
-        ByteBuf entry = ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
-                                                     entryId, lac, 0, Unpooled.buffer(10)));
+    public boolean isErrored(BookieId bookieId) {
+        return errorBookies.contains(bookieId);
+    }
 
-        LedgerData ledger = getBookieData(bookie).computeIfAbsent(ledgerId, LedgerData::new);
-        ledger.addEntry(entryId, entry);
+    public MockBookies getMockBookies() {
+        return mockBookies;
     }
 
     @Override
@@ -140,9 +141,9 @@
                             ForceLedgerCallback cb, Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        cb.forceLedgerComplete(BKException.Code.IllegalOpException,
-                                               ledgerId, addr, ctx);
-                    }));
+                    cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+                            ledgerId, addr, ctx);
+                }));
     }
 
     @Override
@@ -150,9 +151,9 @@
                          long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        cb.writeLacComplete(BKException.Code.IllegalOpException,
-                                               ledgerId, addr, ctx);
-                    }));
+                    cb.writeLacComplete(BKException.Code.IllegalOpException,
+                            ledgerId, addr, ctx);
+                }));
     }
 
     @Override
@@ -161,23 +162,33 @@
                          int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
         toSend.retain();
         preWriteHook.runHook(addr, ledgerId, entryId)
-            .thenComposeAsync(
-                (ignore) -> {
-                    LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
-                    if (errorBookies.contains(addr)) {
-                        LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
-                        return FutureUtils.exception(new BKException.BKWriteException());
-                    }
-                    LedgerData ledger = getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
-                    ledger.addEntry(entryId, copyData(toSend));
-                    toSend.release();
-                    return FutureUtils.value(null);
-                }, executor.chooseThread(ledgerId))
-            .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, entryId))
-            .whenCompleteAsync((res, ex) -> {
+                .thenComposeAsync(
+                        (ignore) -> {
+                            LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
+                            if (isErrored(addr)) {
+                                LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
+                                return FutureUtils.exception(new BKException.BKWriteException());
+                            }
+
+                            try {
+                                if ((options & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD) {
+                                    mockBookies.recoveryAddEntry(addr, ledgerId, entryId, copyData(toSend));
+                                } else {
+                                    mockBookies.addEntry(addr, ledgerId, entryId, copyData(toSend));
+                                }
+                            } catch (BKException bke) {
+                                return FutureUtils.exception(bke);
+                            } finally {
+                                toSend.release();
+                            }
+
+                            return FutureUtils.value(null);
+                        }, executor.chooseThread(ledgerId))
+                .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, entryId))
+                .whenCompleteAsync((res, ex) -> {
                     if (ex != null) {
                         cb.writeComplete(BKException.getExceptionCode(ex, BKException.Code.WriteException),
-                                         ledgerId, entryId, addr, ctx);
+                                ledgerId, entryId, addr, ctx);
                     } else {
                         cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx);
                     }
@@ -188,9 +199,9 @@
     public void readLac(BookieId addr, long ledgerId, ReadLacCallback cb, Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        cb.readLacComplete(BKException.Code.IllegalOpException,
-                                           ledgerId, null, null, ctx);
-                    }));
+                    cb.readLacComplete(BKException.Code.IllegalOpException,
+                            ledgerId, null, null, ctx);
+                }));
     }
 
     @Override
@@ -198,35 +209,28 @@
                           ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
                           boolean allowFastFail) {
         preReadHook.runHook(addr, ledgerId, entryId)
-            .thenComposeAsync((res) -> {
+                .thenComposeAsync((res) -> {
                     LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId);
-                    if (errorBookies.contains(addr)) {
+                    if (isErrored(addr)) {
                         LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId);
                         return FutureUtils.exception(new BKException.BKReadException());
                     }
 
-                    LedgerData ledger = getBookieData(addr).get(ledgerId);
-                    if (ledger == null) {
-                        LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
-                        return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                    try {
+                        ByteBuf entry = mockBookies.readEntry(addr, flags, ledgerId, entryId);
+                        return FutureUtils.value(entry);
+                    } catch (BKException bke) {
+                        return FutureUtils.exception(bke);
                     }
-
-                    ByteBuf entry = ledger.getEntry(entryId);
-                    if (entry == null) {
-                        LOG.warn("[{};L{}] entry({}) not found", addr, ledgerId, entryId);
-                        return FutureUtils.exception(new BKException.BKNoSuchEntryException());
-                    }
-
-                    return FutureUtils.value(entry);
                 }, executor.chooseThread(ledgerId))
-            .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, entryId).thenApply((res) -> buf))
-            .whenCompleteAsync((res, ex) -> {
+                .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, entryId).thenApply((res) -> buf))
+                .whenCompleteAsync((res, ex) -> {
                     if (ex != null) {
                         cb.readEntryComplete(BKException.getExceptionCode(ex, BKException.Code.ReadException),
-                                             ledgerId, entryId, null, ctx);
+                                ledgerId, entryId, null, ctx);
                     } else {
                         cb.readEntryComplete(BKException.Code.OK,
-                                             ledgerId, entryId, res.slice(), ctx);
+                                ledgerId, entryId, res.slice(), ctx);
                     }
                 }, executor.chooseThread(ledgerId));
     }
@@ -242,9 +246,9 @@
                                           Object ctx) {
         executor.executeOrdered(ledgerId,
                 safeRun(() -> {
-                        cb.readEntryComplete(BKException.Code.IllegalOpException,
-                                             ledgerId, entryId, null, ctx);
-                    }));
+                    cb.readEntryComplete(BKException.Code.IllegalOpException,
+                            ledgerId, entryId, null, ctx);
+                }));
     }
 
     @Override
@@ -252,14 +256,14 @@
                               GetBookieInfoCallback cb, Object ctx) {
         executor.executeOrdered(addr,
                 safeRun(() -> {
-                        cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
-                                                 null, ctx);
-                    }));
+                    cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+                            null, ctx);
+                }));
     }
 
     @Override
     public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieId address,
-            long ledgerId) {
+                                                                                     long ledgerId) {
         FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
         executor.executeOrdered(address, safeRun(() -> {
             futureResult
@@ -277,10 +281,6 @@
     public void close() {
     }
 
-    private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieId addr) {
-        return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
-    }
-
     private static ByteBuf copyData(ByteBufList list) {
         ByteBuf buf = Unpooled.buffer(list.readableBytes());
         for (int i = 0; i < list.size(); i++) {
@@ -288,29 +288,4 @@
         }
         return buf;
     }
-
-    private static class LedgerData {
-        final long ledgerId;
-        private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
-        LedgerData(long ledgerId) {
-            this.ledgerId = ledgerId;
-        }
-
-        void addEntry(long entryId, ByteBuf entry) {
-            entries.put(entryId, entry);
-        }
-
-        ByteBuf getEntry(long entryId) {
-            if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
-                Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
-                if (lastEntry != null) {
-                    return lastEntry.getValue();
-                } else {
-                    return null;
-                }
-            } else {
-                return entries.get(entryId);
-            }
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
new file mode 100644
index 0000000..c670e87
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiPredicate;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.DistributionSchedule;
+import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocks an ensemble of bookies and can be shared between more than one MockBookieClient
+ * so that it can be used to check two writers accessing the same ledger.
+ */
+public class MockBookies {
+    static final Logger LOG = LoggerFactory.getLogger(MockBookies.class);
+    final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, MockLedgerData>> data = new ConcurrentHashMap<>();
+
+    public void seedLedgerForBookie(BookieId bookieId, long ledgerId,
+                                    LedgerMetadata metadata) throws Exception {
+        seedLedgerBase(ledgerId, metadata, (bookie, entry) -> bookie.equals(bookieId));
+    }
+
+    public void seedLedger(long ledgerId, LedgerMetadata metadata) throws Exception {
+        seedLedgerBase(ledgerId, metadata, (bookie, entry) -> true);
+    }
+
+    public void seedLedgerBase(long ledgerId, LedgerMetadata metadata,
+                               BiPredicate<BookieId, Long> shouldSeed) throws Exception {
+        DistributionSchedule schedule = new RoundRobinDistributionSchedule(metadata.getWriteQuorumSize(),
+                metadata.getAckQuorumSize(),
+                metadata.getEnsembleSize());
+        long lastEntry = metadata.isClosed()
+                ? metadata.getLastEntryId() : metadata.getAllEnsembles().lastEntry().getKey() - 1;
+        long lac = -1;
+        for (long e = 0; e <= lastEntry; e++) {
+            List<BookieId> ensemble = metadata.getEnsembleAt(e);
+            DistributionSchedule.WriteSet ws = schedule.getWriteSet(e);
+            for (int i = 0; i < ws.size(); i++) {
+                BookieId bookieId = ensemble.get(ws.get(i));
+                if (shouldSeed.test(bookieId, e)) {
+                    seedEntries(bookieId, ledgerId, e, lac);
+                }
+            }
+            lac = e;
+        }
+    }
+
+    public void seedEntries(BookieId bookieId, long ledgerId, long entryId, long lac) throws Exception {
+        ByteBuf entry = generateEntry(ledgerId, entryId, lac);
+        MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        ledger.addEntry(entryId, entry);
+    }
+
+    public ByteBuf generateEntry(long ledgerId, long entryId, long lac) throws Exception {
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0],
+                DataFormats.LedgerMetadataFormat.DigestType.CRC32C,
+                UnpooledByteBufAllocator.DEFAULT, false);
+        return ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
+                entryId, lac, 0, Unpooled.buffer(10)));
+
+    }
+
+    public void addEntry(BookieId bookieId, long ledgerId, long entryId, ByteBuf entry) throws BKException {
+        MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        if (ledger.isFenced()) {
+            throw new BKException.BKLedgerFencedException();
+        }
+        ledger.addEntry(entryId, entry);
+    }
+
+    public void recoveryAddEntry(BookieId bookieId, long ledgerId, long entryId, ByteBuf entry) throws BKException {
+        MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+        ledger.addEntry(entryId, entry);
+    }
+
+    public ByteBuf readEntry(BookieId bookieId, int flags, long ledgerId, long entryId) throws BKException {
+        MockLedgerData ledger = getBookieData(bookieId).get(ledgerId);
+
+        if (ledger == null) {
+            LOG.warn("[{};L{}] ledger not found", bookieId, ledgerId);
+            throw new BKException.BKNoSuchLedgerExistsException();
+        }
+
+        if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING) {
+            ledger.fence();
+        }
+
+        ByteBuf entry = ledger.getEntry(entryId);
+        if (entry == null) {
+            LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, entryId);
+            throw new BKException.BKNoSuchEntryException();
+        }
+
+        return entry;
+    }
+
+    public ConcurrentHashMap<Long, MockLedgerData> getBookieData(BookieId bookieId) {
+        return data.computeIfAbsent(bookieId, (key) -> new ConcurrentHashMap<>());
+    }
+
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
new file mode 100644
index 0000000..05447f0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+    Mock ledger data.
+ */
+public class MockLedgerData {
+    final long ledgerId;
+    boolean isFenced;
+    private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+
+    MockLedgerData(long ledgerId) {
+        this.ledgerId = ledgerId;
+    }
+
+    boolean isFenced() {
+        return isFenced;
+    }
+
+    void fence() {
+        isFenced = true;
+    }
+
+    void addEntry(long entryId, ByteBuf entry) {
+        entries.put(entryId, entry);
+    }
+
+    ByteBuf getEntry(long entryId) {
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+            if (lastEntry != null) {
+                return lastEntry.getValue();
+            } else {
+                return null;
+            }
+        } else {
+            return entries.get(entryId);
+        }
+    }
+}