ISSUE #2615: Fix for invalid ensemble issue during ledger recovery

Ensures that only entries of the current ensemble are included in the ledger recovery process, thus avoiding a ledger recovery failure scenario where it tries to append an ensemble with a lower first entry id than the prior ensemble.

Descriptions of the changes in this PR:

This PR includes a small change in the LedgerRecoveryOp that avoids a scenario where ledger recovery tries to create an invalid ensemble thereby failing. This could cause data unavailability for as long as trigger conditions last.

During ledger recovery, only entries of the current ensemble can be included in the read and write back phase. Prior ensembles, if any, are immutable. But it is possible, in a multi-ensemble ledger, for the current ensemble to return an LAC of -1. This then causes the recovery to read entries from prior ensembles and write them back to the current ensemble. This does not cause any data loss, but it is wasteful of both space and time. The main issue is that if an ensemble change occurs when writing back entries, it will try and create a new ensemble with first entry id of 0. This causes an IllegalStateException as there is a check before the CAS metadata op to ensure that the ensemble does not have an entry id lower than an existing ensemble.

If a bookie of the current ensemble were to be down, then the ledger would be unrecoverable until it became available again. 

The solution is that the lowest safe LAC for recovery is: first entry id of current ensemble - 1.

### Changes

Change to LedgerRecoveryOp as described above.
New unit test in LedgerRecoveryTest2.

Master Issue: #2615


Reviewers: Andrey Yegorov, Enrico Olivelli <eolivelli@gmail.com>, Flavio Junqueira

This closes #2654 from Vanlightly/fix-invalid-ensemble-change, closes #2615
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 534ea83..08a9f2e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -104,7 +104,24 @@
                     public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
                         if (rc == BKException.Code.OK) {
                             synchronized (lh) {
-                                lh.lastAddPushed = lh.lastAddConfirmed = data.getLastAddConfirmed();
+                                /**
+                                 The lowest an LAC can be for use in recovery is the first entry id
+                                 of the current ensemble - 1.
+                                 All ensembles prior to the current one, if any, are confirmed and
+                                 immutable (so are not part of the recovery process).
+                                 So we take the highest of:
+                                 - the LAC returned by the current bookie ensemble (could be -1)
+                                 - the first entry id of the current ensemble - 1.
+                                 */
+                                Long lastEnsembleEntryId = lh.getVersionedLedgerMetadata()
+                                        .getValue()
+                                        .getAllEnsembles()
+                                        .lastEntry()
+                                        .getKey();
+
+                                lh.lastAddPushed = lh.lastAddConfirmed = Math.max(data.getLastAddConfirmed(),
+                                        (lastEnsembleEntryId - 1));
+
                                 lh.length = data.getLength();
                                 lh.pendingAddsSequenceHead = lh.lastAddConfirmed;
                                 startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
index 38e4879..95d93ef 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -59,14 +59,31 @@
     private static final BookieId b5 = new BookieSocketAddress("b5", 3181).toBookieId();
 
     private static Versioned<LedgerMetadata> setupLedger(ClientContext clientCtx, long ledgerId,
-                                              List<BookieId> bookies) throws Exception {
+                                                         List<BookieId> bookies) throws Exception {
         LedgerMetadata md = LedgerMetadataBuilder.create()
-            .withId(ledgerId)
-            .withPassword(PASSWD).withDigestType(DigestType.CRC32C)
-            .newEnsembleEntry(0, bookies).build();
-        return clientCtx.getLedgerManager().createLedgerMetadata(1L, md).get();
+                .withId(ledgerId)
+                .withPassword(PASSWD).withDigestType(DigestType.CRC32C)
+                .withWriteQuorumSize(bookies.size())
+                .newEnsembleEntry(0, bookies).build();
+        return clientCtx.getLedgerManager().createLedgerMetadata(ledgerId, md).get();
     }
 
+    private static Versioned<LedgerMetadata> setupLedger(ClientContext clientCtx, long ledgerId,
+                                                         List<BookieId> bookies,
+                                                         int ensembleSize,
+                                                         int writeQuorumSize,
+                                                         int ackQuorumSize) throws Exception {
+        LedgerMetadata md = LedgerMetadataBuilder.create()
+                .withId(ledgerId)
+                .withPassword(PASSWD).withDigestType(DigestType.CRC32C)
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0, bookies).build();
+        return clientCtx.getLedgerManager().createLedgerMetadata(ledgerId, md).get();
+    }
+
+
     @Test
     public void testCantRecoverAllDown() throws Exception {
         MockClientContext clientCtx = MockClientContext.create();
@@ -491,4 +508,77 @@
             reachedStepFuture.get();
         } catch (Exception e) {}
     }
-}
+
+
+    /*
+     * This test verifies that an IllegalStateException does not occur during recovery because of an attempt
+     * to create a new ensemble that has a lower first entry id than an existing ledger.
+     *
+     * To reproduce original issue, revert the fix and run this test.
+     * The fix was to apply max(LAC from current ensemble, (first entry of current ensemble - 1)) as the LAC
+     * of the recovery phase rather than accept a value of -1 that might be returned by the LAC reads.
+     */
+    @Test
+    public void testRecoveryWhenSecondEnsembleReturnsLacMinusOne() throws Exception {
+        MockClientContext clientCtx = MockClientContext.create();
+        clientCtx.getMockRegistrationClient().addBookies(b4).get();
+
+        // at least two non-empty ensembles required as else the first ensemble would
+        // only be replaced, thus avoiding the issue.
+
+        // initial state: 2 ensembles due to a write failure of e1 to b2
+        // ensemble 1
+        Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2), 2, 2, 2);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b2, 1L, 0L, -1L);
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 1L, -1L);
+        // write to b2 failed, causing ensemble change
+
+        // ensemble 2 - the write of e1 to b2 failed, so new ensemble with b3 created
+        ClientUtil.transformMetadata(clientCtx, 1L,
+                (metadata) -> LedgerMetadataBuilder.from(metadata).newEnsembleEntry(1L, Lists.newArrayList(b1, b3))
+                        .build());
+        clientCtx.getMockBookieClient().getMockBookies().seedEntries(b3, 1L, 1L, 0L);
+
+        ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
+                clientCtx, 1L, md, BookKeeper.DigestType.CRC32C, PASSWD, false);
+
+        // however, any read or write to b3 fails, which will:
+        // 1. cause the LAC read to return -1 (b1 has -1)
+        // 2. cause an ensemble change during recovery write back phase
+        clientCtx.getMockBookieClient().setPreWriteHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b3)) {
+                        return FutureUtils.exception(new BKException.BKWriteException());
+                    } else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        clientCtx.getMockBookieClient().setPreReadHook(
+                (bookie, ledgerId, entryId) -> {
+                    if (bookie.equals(b3)) {
+                        return FutureUtils.exception(new BKException.BKTimeoutException());
+                    } else {
+                        return FutureUtils.value(null);
+                    }
+                });
+
+        // writer 2 starts recovery (the subject of this test)
+        // (either the writer failed or simply has not yet sent the pending writes to the new ensemble)
+        GenericCallbackFuture<Void> recoveryPromise = new GenericCallbackFuture<>();
+        lh.recover(recoveryPromise, null, false);
+        recoveryPromise.get();
+
+        // The recovery process is successfully able to complete recovery, with the expected ensembles.
+        Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().size(), 2);
+        Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).contains(b1));
+        Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).contains(b2));
+        Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(1L).contains(b1));
+        Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(1L).contains(b4));
+
+        // the ledger is closed with entry id 1
+        Assert.assertEquals(lh.getLastAddConfirmed(), 1L);
+        Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), 1L);
+    }
+}
\ No newline at end of file