[BOOKIE-SHELL] Add cmd to change bookie-address into ledger metadata
### Motivation
In a large bookie cluster environment, we frequently require a utility to change bookie-ip with a different bookie-ip in ledger's metadata in different scenarios such as:
1. Host Re-IP: requires updating ledger metadata and reuse the host without losing/copying data
2. Backup-restore usecase while doing cluster level data migration.
Therefore, we frequently need a tool to update bookie-ip with a new bookie-ip in existing ledgers' ensemble metadata.
### Modification
Add CLI command to update bookie-id in ledger metadata.
Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <None>, Jia Zhai <zhaijia@apache.org>
This closes #2321 from rdhabalia/reip
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 0ba984c..73edab9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -63,6 +63,7 @@
import org.apache.bookkeeper.tools.cli.commands.bookie.RebuildDBLedgerLocationsIndexCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.RegenerateInterleavedStorageIndexFileCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.UpdateBookieInLedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.DecommissionCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.EndpointInfoCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
@@ -135,6 +136,7 @@
static final String CMD_LISTFILESONDISC = "listfilesondisc";
static final String CMD_UPDATECOOKIE = "updatecookie";
static final String CMD_UPDATELEDGER = "updateledgers";
+ static final String CMD_UPDATE_BOOKIE_IN_LEDGER = "updateBookieInLedger";
static final String CMD_DELETELEDGER = "deleteledger";
static final String CMD_BOOKIEINFO = "bookieinfo";
static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
@@ -1562,6 +1564,85 @@
}
/**
+ * Update bookie into ledger command.
+ */
+ class UpdateBookieInLedgerCmd extends MyCommand {
+ private final Options opts = new Options();
+
+ UpdateBookieInLedgerCmd() {
+ super(CMD_UPDATE_BOOKIE_IN_LEDGER);
+ opts.addOption("sb", "srcBookie", true, "Source bookie which needs to be replaced by destination bookie.");
+ opts.addOption("db", "destBookie", true, "Destination bookie which replaces source bookie.");
+ opts.addOption("s", "updatespersec", true, "Number of ledgers updating per second (default: 5 per sec)");
+ opts.addOption("r", "maxOutstandingReads", true, "Max outstanding reads (default: 5 * updatespersec)");
+ opts.addOption("l", "limit", true, "Maximum number of ledgers to update (default: no limit)");
+ opts.addOption("v", "verbose", true, "Print status of the ledger updation (default: false)");
+ opts.addOption("p", "printprogress", true,
+ "Print messages on every configured seconds if verbose turned on (default: 10 secs)");
+ }
+
+ @Override
+ Options getOptions() {
+ return opts;
+ }
+
+ @Override
+ String getDescription() {
+ return "Replace bookie in ledger metadata. (useful when re-ip of host) "
+ + "replace srcBookie with destBookie. (this may take a long time).";
+ }
+
+ @Override
+ String getUsage() {
+ return "updateBookieInLedger -srcBookie <source bookie> -destBookie <destination bookie> "
+ + "[-updatespersec N] [-maxOutstandingReads N] [-limit N] [-verbose true/false] [-printprogress N]";
+ }
+
+ @Override
+ int runCmd(CommandLine cmdLine) throws Exception {
+ UpdateBookieInLedgerCommand cmd = new UpdateBookieInLedgerCommand();
+ UpdateBookieInLedgerCommand.UpdateBookieInLedgerFlags flags =
+ new UpdateBookieInLedgerCommand.UpdateBookieInLedgerFlags();
+
+ final String srcBookie = cmdLine.getOptionValue("srcBookie");
+ final String destBookie = cmdLine.getOptionValue("destBookie");
+ if (StringUtils.isBlank(srcBookie) || StringUtils.isBlank(destBookie)) {
+ LOG.error("Invalid argument list (srcBookie and destBookie must be provided)!");
+ this.printUsage();
+ return -1;
+ }
+ if (StringUtils.equals(srcBookie, destBookie)) {
+ LOG.error("srcBookie and destBookie can't be the same.");
+ return -1;
+ }
+ final int rate = getOptionIntValue(cmdLine, "updatespersec", 5);
+ final int maxOutstandingReads = getOptionIntValue(cmdLine, "maxOutstandingReads", (rate * 5));
+ final int limit = getOptionIntValue(cmdLine, "limit", Integer.MIN_VALUE);
+ final boolean verbose = getOptionBooleanValue(cmdLine, "verbose", false);
+ final long printprogress;
+ if (!verbose) {
+ if (cmdLine.hasOption("printprogress")) {
+ LOG.warn("Ignoring option 'printprogress', this is applicable when 'verbose' is true");
+ }
+ printprogress = Integer.MIN_VALUE;
+ } else {
+ // defaulting to 10 seconds
+ printprogress = getOptionLongValue(cmdLine, "printprogress", 10);
+ }
+ flags.srcBookie(srcBookie);
+ flags.destBookie(destBookie);
+ flags.printProgress(printprogress);
+ flags.limit(limit);
+ flags.updatePerSec(rate);
+ flags.maxOutstandingReads(maxOutstandingReads);
+ flags.verbose(verbose);
+
+ boolean result = cmd.apply(bkConf, flags);
+ return (result) ? 0 : -1;
+ }
+ }
+
+ /**
* Command to delete a given ledger.
*/
class DeleteLedgerCmd extends MyCommand {
@@ -1950,6 +2031,7 @@
commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd());
commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
+ commands.put(CMD_UPDATE_BOOKIE_IN_LEDGER, new UpdateBookieInLedgerCmd());
commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/UpdateBookieInLedgerCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/UpdateBookieInLedgerCommand.java
new file mode 100644
index 0000000..88d1b4d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/UpdateBookieInLedgerCommand.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.UpdateLedgerOp;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command to update ledger command.
+ */
+public class UpdateBookieInLedgerCommand extends BookieCommand<UpdateBookieInLedgerCommand.UpdateBookieInLedgerFlags> {
+
+ static final Logger LOG = LoggerFactory.getLogger(UpdateBookieInLedgerCommand.class);
+
+ private static final String NAME = "update-bookie-ledger-cmd";
+ private static final String DESC = "Update bookie in ledgers metadata (this may take a long time).";
+
+ public UpdateBookieInLedgerCommand() {
+ this(new UpdateBookieInLedgerFlags());
+ }
+
+ private UpdateBookieInLedgerCommand(UpdateBookieInLedgerFlags flags) {
+ super(CliSpec.<UpdateBookieInLedgerFlags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(flags)
+ .build());
+ }
+
+ /**
+ * Flags for update bookie in ledger command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class UpdateBookieInLedgerFlags extends CliFlags {
+
+ @Parameter(names = { "-sb", "--srcBookie" },
+ description = "Source bookie which needs to be replaced by destination bookie. <bk-address:port>")
+ private String srcBookie;
+
+ @Parameter(names = { "-db", "--destBookie" },
+ description = "Destination bookie which replaces source bookie. <bk-address:port>")
+ private String destBookie;
+
+ @Parameter(names = { "-s", "--updatepersec" },
+ description = "Number of ledgers updating per second (default: 5 per sec)")
+ private int updatePerSec = 5;
+
+ @Parameter(names = { "-r",
+ "--maxOutstandingReads" }, description = "Max outstanding reads (default: 5 * updatespersec)")
+ private int maxOutstandingReads = updatePerSec * 5;
+
+ @Parameter(names = {"-l", "--limit"},
+ description = "Maximum number of ledgers of ledgers to update (default: no limit)")
+ private int limit = Integer.MIN_VALUE;
+
+ @Parameter(names = { "-v", "--verbose" }, description = "Print status of the ledger updation (default: false)")
+ private boolean verbose;
+
+ @Parameter(names = { "-p", "--printprogress" },
+ description = "Print messages on every configured seconds if verbose turned on (default: 10 secs)")
+ private long printProgress = 10;
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, UpdateBookieInLedgerFlags cmdFlags) {
+ try {
+ return updateLedger(conf, cmdFlags);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+
+ private boolean updateLedger(ServerConfiguration conf, UpdateBookieInLedgerFlags flags)
+ throws InterruptedException, BKException, IOException {
+
+ BookieSocketAddress srcBookieAddress;
+ BookieSocketAddress destBookieAddress;
+ try {
+ String[] bookieAddress = flags.srcBookie.split(":");
+ srcBookieAddress = new BookieSocketAddress(bookieAddress[0], Integer.parseInt(bookieAddress[1]));
+ bookieAddress = flags.destBookie.split(":");
+ destBookieAddress = new BookieSocketAddress(bookieAddress[0], Integer.parseInt(bookieAddress[1]));
+ } catch (Exception e) {
+ LOG.error("Bookie address must in <address>:<port> format");
+ return false;
+ }
+
+ final int rate = flags.updatePerSec;
+ if (rate <= 0) {
+ LOG.error("Invalid updatespersec {}, should be > 0", rate);
+ return false;
+ }
+
+ final int maxOutstandingReads = flags.maxOutstandingReads;
+ if (maxOutstandingReads <= 0) {
+ LOG.error("Invalid maxOutstandingReads {}, should be > 0", maxOutstandingReads);
+ return false;
+ }
+
+ final int limit = flags.limit;
+ if (limit <= 0 && limit != Integer.MIN_VALUE) {
+ LOG.error("Invalid limit {}, should be > 0", limit);
+ return false;
+ }
+
+ final long printProgress;
+ if (flags.verbose) {
+ printProgress = 10;
+ } else {
+ printProgress = flags.printProgress;
+ }
+
+ final ClientConfiguration clientConfiguration = new ClientConfiguration();
+ clientConfiguration.addConfiguration(conf);
+ final BookKeeper bk = new BookKeeper(clientConfiguration);
+ final BookKeeperAdmin admin = new BookKeeperAdmin(bk);
+ if (admin.getAvailableBookies().contains(srcBookieAddress)
+ || admin.getReadOnlyBookies().contains(srcBookieAddress)) {
+ bk.close();
+ admin.close();
+ LOG.error("Source bookie {} can't be active", srcBookieAddress);
+ return false;
+ }
+ final UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, admin);
+
+ BookieShell.UpdateLedgerNotifier progressable = new BookieShell.UpdateLedgerNotifier() {
+ long lastReport = System.nanoTime();
+
+ @Override
+ public void progress(long updated, long issued) {
+ if (printProgress <= 0) {
+ return; // disabled
+ }
+ if (TimeUnit.MILLISECONDS.toSeconds(MathUtils.elapsedMSec(lastReport)) >= printProgress) {
+ LOG.info("Number of ledgers issued={}, updated={}", issued, updated);
+ lastReport = MathUtils.nowInNano();
+ }
+ }
+ };
+
+ try {
+ updateLedgerOp.updateBookieIdInLedgers(srcBookieAddress, destBookieAddress, rate, maxOutstandingReads,
+ limit, progressable);
+ } catch (IOException e) {
+ LOG.error("Failed to update ledger metadata", e);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
index 4e8d892..f74adc6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
@@ -83,6 +83,32 @@
assertEquals("Failed to update the ledger metadata to use bookie host name", 40, updatedLedgersCount);
}
+ /**
+ * replace bookie address in ledger.
+ */
+ @Test
+ public void testUpdateBookieInLedger() throws Exception {
+ BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+ LOG.info("Create ledger and add entries to it");
+ List<LedgerHandle> ledgers = new ArrayList<LedgerHandle>();
+ LedgerHandle lh1 = createLedgerWithEntries(bk, 0);
+ ledgers.add(lh1);
+ for (int i = 1; i < 40; i++) {
+ ledgers.add(createLedgerWithEntries(bk, 0));
+ }
+ BookieSocketAddress srcBookie = bs.get(0).getLocalAddress();
+ BookieSocketAddress destBookie = new BookieSocketAddress("1.1.1.1", 2181);
+ String[] argv = new String[] { "updateBookieInLedger", "-sb", srcBookie.toString(), "-db",
+ destBookie.toString(), "-v", "true", "-p", "2" };
+ final ServerConfiguration conf = bsConfs.get(0);
+ bs.get(0).shutdown();
+ updateLedgerCmd(argv, 0, conf);
+ int updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, srcBookie);
+ assertEquals("Failed to update the ledger metadata with new bookie-address", 0, updatedLedgersCount);
+ updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, destBookie);
+ assertEquals("Failed to update the ledger metadata with new bookie-address", 40, updatedLedgersCount);
+ }
+
private void updateLedgerCmd(String[] argv, int exitCode, ServerConfiguration conf) throws KeeperException,
InterruptedException, IOException, UnknownHostException, Exception {
LOG.info("Perform updateledgers command");
diff --git a/site/_data/cli/shell.yaml b/site/_data/cli/shell.yaml
index 071fb34..585947e 100644
--- a/site/_data/cli/shell.yaml
+++ b/site/_data/cli/shell.yaml
@@ -180,6 +180,25 @@
description: Print status of the ledger updation (default false)
- flag: -printprogress N
description: Print messages on every configured seconds if verbose turned on (default 10 secs)
+- name: updateBookieInLedger
+ description: |
+ Replace srcBookie with destBookie in ledger metadata. (this may take a long time).
+ Useful when Host-reip or data-migration. In that case, shutdown bookie process in src-bookie,
+ use this command to update ledger metadata by replacing src-bookie to dest-bookie where data has been copied/moved.
+ Start the bookie process on dest-bookie and dest-bookie will serve copied ledger data from src-bookie.
+ options:
+ - flag: -srcBookie <bookie-id>
+ description: Source Bookie Id
+ - flag: -destBookie <bookie-id>
+ description: Destination Bookie Id
+ - flag: -updatespersec N
+ description: Number of ledgers updating per second (default 5 per sec)
+ - flag: -limit N
+ description: Maximum number of ledgers to update (default no limit)
+ - flag: -verbose
+ description: Print status of the ledger updation (default false)
+ - flag: -printprogress N
+ description: Print messages on every configured seconds if verbose turned on (default 10 secs)
- name: whoisauditor
description: Print the node which holds the auditor lock
- name: whatisinstanceid