BOOKKEEPER-938 ledger digestType autodetection on open

Currently digestType verification in LedgerOpenOp seems to be treated as part of security logic.
Since it is checked after password and error explicitly states that digestType mismatched,
all that evil hacker has to do is to change digest type to another one. There are only two of them after all.

here is the scenario significantly affected by current behavior:

1. user rolls out clients with digestType set to MAC and creates lots of ledgers.
2. user notices that MAC is slower than CRC32 and decides to change digestType.
3. more ledgers created with CRC32.
4. user tries to read old and new ledgers
-> now old ledgers cannot be read because of the digest type mismatch.

Author: Andrey Yegorov <ayegorov@salesforce.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #52 from dlg99/fix/BOOKKEEPER-938-digest-autodetect
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index cc97866..6cf2721 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -48,18 +48,20 @@
     final Object ctx;
     LedgerHandle lh;
     final byte[] passwd;
-    final DigestType digestType;
     boolean doRecovery = true;
     boolean administrativeOpen = false;
     long startTime;
     OpStatsLogger openOpLogger;
+    
+    final DigestType suggestedDigestType;
+    final boolean enableDigestAutodetection;
 
     /**
      * Constructor.
      *
      * @param bk
      * @param ledgerId
-     * @param digestType
+     * @param digestType. Ignored if conf.getEnableDigestTypeAutodetection() is true
      * @param passwd
      * @param cb
      * @param ctx
@@ -71,7 +73,8 @@
         this.passwd = passwd;
         this.cb = cb;
         this.ctx = ctx;
-        this.digestType = digestType;
+        this.enableDigestAutodetection = bk.conf.getEnableDigestTypeAutodetection();
+        this.suggestedDigestType = digestType;
     }
 
     public LedgerOpenOp(BookKeeper bk, long ledgerId, OpenCallback cb, Object ctx) {
@@ -81,8 +84,9 @@
         this.ctx = ctx;
 
         this.passwd = bk.getConf().getBookieRecoveryPasswd();
-        this.digestType = bk.getConf().getBookieRecoveryDigestType();
         this.administrativeOpen = true;
+        this.enableDigestAutodetection = false;
+        this.suggestedDigestType = bk.conf.getBookieRecoveryDigestType();
     }
 
     /**
@@ -119,8 +123,10 @@
         }
 
         final byte[] passwd;
-        final DigestType digestType;
-
+        DigestType digestType = enableDigestAutodetection 
+                                    ? metadata.getDigestType() 
+                                    : suggestedDigestType;
+										
         /* For an administrative open, the default passwords
          * are read from the configuration, but if the metadata
          * already contains passwords, use these instead. */
@@ -129,7 +135,6 @@
             digestType = metadata.getDigestType();
         } else {
             passwd = this.passwd;
-            digestType = this.digestType;
 
             if (metadata.hasPassword()) {
                 if (!Arrays.equals(passwd, metadata.getPassword())) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index b8d738b..57c3790 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -43,6 +43,8 @@
 
     // Digest Type
     protected final static String DIGEST_TYPE = "digestType";
+    protected final static String ENABLE_DIGEST_TYPE_AUTODETECTION = "enableDigestTypeAutodetection";
+
     // Passwd
     protected final static String PASSWD = "passwd";
 
@@ -132,6 +134,29 @@
     }
 
     /**
+     * Get autodetection of digest type.
+     * Ignores provided digestType, if enabled and uses one from ledger metadata instead.
+     * Incompatible with ledger created by bookie versions < 4.2
+     *
+     * @return flag to enable/disable autodetection of digest type.
+     */
+    public boolean getEnableDigestTypeAutodetection() {
+        return getBoolean(ENABLE_DIGEST_TYPE_AUTODETECTION, false);
+    }
+
+    /**
+     * Enable autodetection of digest type.
+     * Ignores provided digestType, if enabled and uses one from ledger metadata instead.
+     * Incompatible with ledger created by bookie versions < 4.2
+     *
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnableDigestTypeAutodetection(boolean enable) {
+        this.setProperty(ENABLE_DIGEST_TYPE_AUTODETECTION, enable);
+        return this;
+    }
+    
+    /**
      * Get digest type used in bookkeeper admin
      *
      * @return digest type
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
new file mode 100644
index 0000000..3b28db0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgersWithDifferentDigestsTest.java
@@ -0,0 +1,201 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+/**
+ * Verify reads from ledgers with different digest types.
+ * This can happen as result of clients using different settings
+ * yet reading each other data or configuration change roll out.
+ */
+public class BookieWriteLedgersWithDifferentDigestsTest extends
+        MultiLedgerManagerMultiDigestTestCase implements AddCallback {
+
+    private final static Logger LOG = LoggerFactory
+            .getLogger(BookieWriteLedgersWithDifferentDigestsTest.class);
+
+    byte[] ledgerPassword = "aaa".getBytes();
+    LedgerHandle lh, lh2;
+    Enumeration<LedgerEntry> ls;
+
+    // test related variables
+    final int numEntriesToWrite = 20;
+    int maxInt = Integer.MAX_VALUE;
+    Random rng;
+    ArrayList<byte[]> entries1; // generated entries
+    ArrayList<byte[]> entries2; // generated entries
+
+    DigestType digestType;
+
+    private static class SyncObj {
+        volatile int counter;
+        volatile int rc;
+
+        public SyncObj() {
+            counter = 0;
+        }
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+        // Number Generator
+        entries1 = new ArrayList<byte[]>(); // initialize the entries list
+        entries2 = new ArrayList<byte[]>(); // initialize the entries list
+    }
+
+    public BookieWriteLedgersWithDifferentDigestsTest(String ledgerManagerFactory,
+            DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
+        // set ledger manager
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+    }
+
+    @Test(timeout = 60000)
+    public void testLedgersWithDifferentDigestTypesNoAutodetection() throws Exception {
+    	bkc.conf.setEnableDigestTypeAutodetection(false);
+        // Create ledgers
+        lh = bkc.createLedgerAdv(3, 2, 2, DigestType.MAC, ledgerPassword);
+        
+        final long id = lh.ledgerId;
+
+        LOG.info("Ledger ID-1: " + lh.getId());
+        SyncObj syncObj1 = new SyncObj();
+        for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            entries1.add(0, entry.array());
+            lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
+        }
+
+        // Wait for all entries to be acknowledged
+        waitForEntriesAddition(syncObj1, numEntriesToWrite);
+
+        // Reads here work ok because ledger uses digest type set during create
+        readEntries(lh, entries1);
+        lh.close();
+        
+        try {
+	        bkc.openLedgerNoRecovery(id, DigestType.CRC32, ledgerPassword).close();
+	        fail("digest mismatch error is expected");
+        } catch (BKException bke) {
+        	// expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testLedgersWithDifferentDigestTypesWithAutodetection() throws Exception {
+    	bkc.conf.setEnableDigestTypeAutodetection(true);
+        // Create ledgers
+        lh = bkc.createLedgerAdv(3, 2, 2, DigestType.MAC, ledgerPassword);
+        lh2 = bkc.createLedgerAdv(3, 2, 2, DigestType.CRC32, ledgerPassword);
+        
+        final long id = lh.ledgerId;
+        final long id2 = lh2.ledgerId;
+
+        LOG.info("Ledger ID-1: " + lh.getId());
+        LOG.info("Ledger ID-2: " + lh2.getId());
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+        for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            entries1.add(0, entry.array());
+            entries2.add(0, entry.array());
+            lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
+            lh2.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2);
+        }
+
+        // Wait for all entries to be acknowledged
+        waitForEntriesAddition(syncObj1, numEntriesToWrite);
+        waitForEntriesAddition(syncObj2, numEntriesToWrite);
+
+        // Reads here work ok because ledger uses digest type set during create
+        readEntries(lh, entries1);
+        readEntries(lh2, entries2);
+        lh.close();
+        lh2.close();
+        
+        // open here would fail if provided digest type is used
+        // it passes because ledger just uses digest type from its metadata/autodetects it
+        lh = bkc.openLedgerNoRecovery(id, DigestType.CRC32, ledgerPassword);
+        lh2 = bkc.openLedgerNoRecovery(id2, DigestType.MAC, ledgerPassword);
+        readEntries(lh, entries1);
+        readEntries(lh2, entries2);
+        lh.close();
+        lh2.close();
+    }
+    
+	private void waitForEntriesAddition(SyncObj syncObj, int numEntriesToWrite) throws InterruptedException {
+		synchronized (syncObj) {
+            while (syncObj.counter < numEntriesToWrite) {
+                syncObj.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj.rc);
+        }
+	}
+
+    private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws InterruptedException, BKException {
+        ls = lh.readEntries(0, numEntriesToWrite - 1);
+        int index = 0;
+        while (ls.hasMoreElements()) {
+            ByteBuffer origbb = ByteBuffer.wrap(entries.get(index++));
+            Integer origEntry = origbb.getInt();
+            ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+            LOG.debug("Length of result: " + result.capacity());
+            LOG.debug("Original entry: " + origEntry);
+            Integer retrEntry = result.getInt();
+            LOG.debug("Retrieved entry: " + retrEntry);
+            assertTrue("Checking entry " + index + " for equality", origEntry
+                    .equals(retrEntry));
+        }
+    }
+
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        SyncObj x = (SyncObj) ctx;
+        synchronized (x) {
+            x.rc = rc;
+            x.counter++;
+            x.notify();
+        }
+    }
+}