Add Add ForceAuditChecksCmd to reset auditor checks last run time (#2472)

Motivation
Reset the last run time for auditor checks. Reboot auditor leader immediately to trigger the checks. This is mostly used for testing infra and get the sanity check of a cluster on demand.

Changes
Added a new shell command and tests
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d17d74d..d989329 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -18,6 +18,7 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
@@ -27,6 +28,7 @@
 import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,7 +42,9 @@
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Private;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand;
 import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand;
 import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand;
@@ -99,6 +103,7 @@
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,6 +150,7 @@
     static final String CMD_ENDPOINTINFO = "endpointinfo";
     static final String CMD_LOSTBOOKIERECOVERYDELAY = "lostbookierecoverydelay";
     static final String CMD_TRIGGERAUDIT = "triggeraudit";
+    static final String CMD_FORCEAUDITCHECKS = "forceauditchecks";
     static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
     static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage";
     static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index";
@@ -1799,6 +1805,76 @@
         }
     }
 
+    class ForceAuditorChecksCmd extends MyCommand {
+        Options opts = new Options();
+
+        ForceAuditorChecksCmd() {
+            super(CMD_FORCEAUDITCHECKS);
+            opts.addOption("calc", "checkallledgerscheck", false, "Force checkAllLedgers audit "
+                    + "upon next Auditor startup ");
+            opts.addOption("ppc", "placementpolicycheck", false, "Force placementPolicyCheck audit "
+                    + "upon next Auditor startup ");
+            opts.addOption("rc", "replicascheck", false, "Force replicasCheck audit "
+                    + "upon next Auditor startup ");
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Reset the last run time of auditor checks "
+                    + "(checkallledgerscheck, placementpolicycheck, replicascheck) "
+                    + "The current auditor must be REBOOTED after this command is run.";
+        }
+
+        @Override
+        String getUsage() {
+            return "forceauditchecks [-checkallledgerscheck [-placementpolicycheck] [-replicascheck]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean checkAllLedgersCheck = cmdLine.hasOption("calc");
+            boolean placementPolicyCheck = cmdLine.hasOption("ppc");
+            boolean replicasCheck = cmdLine.hasOption("rc");
+
+            if (checkAllLedgersCheck || placementPolicyCheck  || replicasCheck) {
+                runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
+                    try {
+                        try (LedgerUnderreplicationManager underreplicationManager =
+                                     mFactory.newLedgerUnderreplicationManager()) {
+                            // Arbitrary value of 21 days chosen since current freq of all checks is less than 21 days
+                            long time = System.currentTimeMillis() - (21 * 24 * 60 * 60 * 1000);
+                            if (checkAllLedgersCheck) {
+                                LOG.info("Resetting CheckAllLedgersCTime to : " + new Timestamp(time));
+                                underreplicationManager.setCheckAllLedgersCTime(time);
+                            }
+                            if (placementPolicyCheck) {
+                                LOG.info("Resetting PlacementPolicyCheckCTime to : " + new Timestamp(time));
+                                underreplicationManager.setPlacementPolicyCheckCTime(time);
+                            }
+                            if (replicasCheck) {
+                                LOG.info("Resetting ReplicasCheckCTime to : " + new Timestamp(time));
+                                underreplicationManager.setReplicasCheckCTime(time);
+                            }
+                        }
+                    } catch (InterruptedException | KeeperException | ReplicationException e) {
+                        LOG.error("Exception while trying to reset last run time ", e);
+                        return -1;
+                    }
+                    return 0;
+                });
+            } else {
+                LOG.error("Command line args must contain atleast one type of check. This was a no-op.");
+                return -1;
+            }
+            return 0;
+        }
+    }
+
     /**
      * Command to trigger AuditTask by resetting lostBookieRecoveryDelay and
      * then make sure the ledgers stored in the bookie are properly replicated
@@ -2093,6 +2169,7 @@
         commands.put(CMD_HELP, new HelpCmd());
         commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
         commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
+        commands.put(CMD_FORCEAUDITCHECKS, new ForceAuditorChecksCmd());
         // cookie related commands
         commands.put(CMD_CREATE_COOKIE,
             new CreateCookieCommand().asShellCommand(CMD_CREATE_COOKIE, bkConf));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
index 6fb759b..2c9eb81 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
@@ -426,4 +426,25 @@
         verify(mockListBookiesFlags, times(1)).all(true);
     }
 
+    @Test
+    public void testForceAuditChecksWithNoArgs() throws Exception {
+        assertEquals(-1, shell.run(new String[] {
+                "forceauditchecks"
+        }));
+    }
+
+    @Test
+    public void testForceAuditChecksWithSomeArgs() throws Exception {
+        assertEquals(0, shell.run(new String[] {
+                "forceauditchecks", "-calc"
+        }));
+    }
+
+    @Test
+    public void testForceAuditChecksWithAllArgs() throws Exception {
+        assertEquals(0, shell.run(new String[] {
+                "forceauditchecks", "-calc", "-rc", "-ppc"
+        }));
+    }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
new file mode 100644
index 0000000..4c0479a
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Integration test of {@link BookieShell.TriggerAuditCmd}.
+ */
+public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
+
+    public ForceAuditorChecksCmdTest() {
+        super(1);
+        baseConf.setAuditorPeriodicPlacementPolicyCheckInterval(10000);
+        baseConf.setAuditorPeriodicReplicasCheckInterval(10000);
+    }
+
+    /**
+     * Verify that the auditor checks last execution time (stored in zookeeper) is reset to an older value
+     * when triggeraudit command is run with certain parameters. Rebooting the auditor after this would
+     * result in immediate run of audit checks.
+     */
+    @Test
+    public void verifyAuditCTimeReset() throws Exception {
+        String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc" };
+        long curTime = System.currentTimeMillis();
+
+        final ServerConfiguration conf = bsConfs.get(0);
+        BookieShell bkShell = new BookieShell();
+        bkShell.setConf(conf);
+
+        // Add dummy last execution time for audit checks
+        runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+            try (LedgerUnderreplicationManager urM =
+                         mFactory.newLedgerUnderreplicationManager()) {
+                urM.setCheckAllLedgersCTime(curTime);
+                urM.setPlacementPolicyCheckCTime(curTime);
+                urM.setReplicasCheckCTime(curTime);
+            } catch (InterruptedException | KeeperException | ReplicationException e) {
+                throw new UncheckedExecutionException(e);
+            }
+            return null;
+        });
+
+        // Run the actual shell command
+        Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
+
+        // Verify that the time has been reset to an older value (at least 20 days)
+        runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+            try (LedgerUnderreplicationManager urm =
+                         mFactory.newLedgerUnderreplicationManager()) {
+                long checkAllLedgersCTime = urm.getCheckAllLedgersCTime();
+                if (checkAllLedgersCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) {
+                    Assert.fail("The checkAllLedgersCTime should have been reset to atleast 20 days old");
+                }
+                long placementPolicyCheckCTime = urm.getPlacementPolicyCheckCTime();
+                if (placementPolicyCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) {
+                    Assert.fail("The placementPolicyCheckCTime should have been reset to atleast 20 days old");
+                }
+                long replicasCheckCTime = urm.getReplicasCheckCTime();
+                if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) {
+                    Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old");
+                }
+            } catch (InterruptedException | KeeperException | ReplicationException e) {
+                throw new UncheckedExecutionException(e);
+            }
+            return null;
+        });
+    }
+}