blob: df31bf218e1fd6e665d733e0a795ba1925fa284f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Formatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.UpdateLedgerOp;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.HexDump;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.AbstractFuture;
/**
* Bookie Shell is to provide utilities for users to administer a bookkeeper cluster.
*/
public class BookieShell implements Tool {
static final Logger LOG = LoggerFactory.getLogger(BookieShell.class);
static final String ENTRY_FORMATTER_CLASS = "entryFormatterClass";
static final String CMD_METAFORMAT = "metaformat";
static final String CMD_BOOKIEFORMAT = "bookieformat";
static final String CMD_RECOVER = "recover";
static final String CMD_LEDGER = "ledger";
static final String CMD_READ_LEDGER_ENTRIES = "readledger";
static final String CMD_LISTLEDGERS = "listledgers";
static final String CMD_LEDGERMETADATA = "ledgermetadata";
static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
static final String CMD_WHOISAUDITOR = "whoisauditor";
static final String CMD_SIMPLETEST = "simpletest";
static final String CMD_BOOKIESANITYTEST = "bookiesanity";
static final String CMD_READLOG = "readlog";
static final String CMD_READJOURNAL = "readjournal";
static final String CMD_LASTMARK = "lastmark";
static final String CMD_AUTORECOVERY = "autorecovery";
static final String CMD_LISTBOOKIES = "listbookies";
static final String CMD_UPDATECOOKIE = "updatecookie";
static final String CMD_UPDATELEDGER = "updateledgers";
static final String CMD_HELP = "help";
final ServerConfiguration bkConf = new ServerConfiguration();
File[] indexDirectories;
File[] ledgerDirectories;
File journalDirectory;
EntryLogger entryLogger = null;
Journal journal = null;
EntryFormatter formatter;
int pageSize;
int entriesPerPage;
interface Command {
public int runCmd(String[] args) throws Exception;
public void printUsage();
}
abstract class MyCommand implements Command {
abstract Options getOptions();
abstract String getDescription();
abstract String getUsage();
abstract int runCmd(CommandLine cmdLine) throws Exception;
String cmdName;
MyCommand(String cmdName) {
this.cmdName = cmdName;
}
@Override
public int runCmd(String[] args) throws Exception {
try {
BasicParser parser = new BasicParser();
CommandLine cmdLine = parser.parse(getOptions(), args);
return runCmd(cmdLine);
} catch (ParseException e) {
LOG.error("Error parsing command line arguments : ", e);
printUsage();
return -1;
}
}
@Override
public void printUsage() {
HelpFormatter hf = new HelpFormatter();
System.err.println(cmdName + ": " + getDescription());
hf.printHelp(getUsage(), getOptions());
}
}
/**
* Format the bookkeeper metadata present in zookeeper
*/
class MetaFormatCmd extends MyCommand {
Options opts = new Options();
MetaFormatCmd() {
super(CMD_METAFORMAT);
opts.addOption("n", "nonInteractive", false,
"Whether to confirm if old data exists..?");
opts.addOption("f", "force", false,
"If [nonInteractive] is specified, then whether"
+ " to force delete the old data without prompt.");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Format bookkeeper metadata in zookeeper";
}
@Override
String getUsage() {
return "metaformat [-nonInteractive] [-force]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
boolean interactive = (!cmdLine.hasOption("n"));
boolean force = cmdLine.hasOption("f");
ClientConfiguration adminConf = new ClientConfiguration(bkConf);
boolean result = BookKeeperAdmin.format(adminConf, interactive,
force);
return (result) ? 0 : 1;
}
}
/**
* Formats the local data present in current bookie server
*/
class BookieFormatCmd extends MyCommand {
Options opts = new Options();
public BookieFormatCmd() {
super(CMD_BOOKIEFORMAT);
opts.addOption("n", "nonInteractive", false,
"Whether to confirm if old data exists..?");
opts.addOption("f", "force", false,
"If [nonInteractive] is specified, then whether"
+ " to force delete the old data without prompt..?");
opts.addOption("d", "deleteCookie", false, "Delete its cookie on zookeeper");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Format the current server contents";
}
@Override
String getUsage() {
return "bookieformat [-nonInteractive] [-force] [-deleteCookie]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
boolean interactive = (!cmdLine.hasOption("n"));
boolean force = cmdLine.hasOption("f");
ServerConfiguration conf = new ServerConfiguration(bkConf);
boolean result = Bookie.format(conf, interactive, force);
// delete cookie
if (cmdLine.hasOption("d")) {
ZooKeeperClient zkc =
ZooKeeperClient.newBuilder()
.connectString(conf.getZkServers())
.sessionTimeoutMs(conf.getZkTimeout())
.build();
try {
Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zkc, conf);
cookie.getValue().deleteFromZooKeeper(zkc, conf, cookie.getVersion());
} catch (KeeperException.NoNodeException nne) {
LOG.warn("No cookie to remove : ", nne);
} finally {
zkc.close();
}
}
return (result) ? 0 : 1;
}
}
/**
* Recover command for ledger data recovery for failed bookie
*/
class RecoverCmd extends MyCommand {
Options opts = new Options();
public RecoverCmd() {
super(CMD_RECOVER);
opts.addOption("d", "deleteCookie", false, "Delete cookie node for the bookie.");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Recover the ledger data for failed bookie";
}
@Override
String getUsage() {
return "recover [-deleteCookie] <bookieSrc> [bookieDest]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
String[] args = cmdLine.getArgs();
if (args.length < 1) {
throw new MissingArgumentException(
"'bookieSrc' argument required");
}
ClientConfiguration adminConf = new ClientConfiguration(bkConf);
BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
try {
return bkRecovery(adminConf, admin, args, cmdLine.hasOption("d"));
} finally {
admin.close();
}
}
private int bkRecovery(ClientConfiguration conf, BookKeeperAdmin bkAdmin,
String[] args, boolean deleteCookie)
throws InterruptedException, BKException, KeeperException, IOException {
final String bookieSrcString[] = args[0].split(":");
if (bookieSrcString.length != 2) {
System.err.println("BookieSrc inputted has invalid format"
+ "(host:port expected): " + args[0]);
return -1;
}
final BookieSocketAddress bookieSrc = new BookieSocketAddress(
bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
BookieSocketAddress bookieDest = null;
if (args.length >= 2) {
final String bookieDestString[] = args[1].split(":");
if (bookieDestString.length < 2) {
System.err.println("BookieDest inputted has invalid format"
+ "(host:port expected): " + args[1]);
return -1;
}
bookieDest = new BookieSocketAddress(bookieDestString[0],
Integer.parseInt(bookieDestString[1]));
}
bkAdmin.recoverBookieData(bookieSrc, bookieDest);
if (deleteCookie) {
try {
Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc);
cookie.getValue().deleteFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc, cookie.getVersion());
} catch (KeeperException.NoNodeException nne) {
LOG.warn("No cookie to remove for {} : ", bookieSrc, nne);
}
}
return 0;
}
}
/**
* Ledger Command Handles ledger related operations
*/
class LedgerCmd extends MyCommand {
Options lOpts = new Options();
LedgerCmd() {
super(CMD_LEDGER);
lOpts.addOption("m", "meta", false, "Print meta information");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
String[] leftArgs = cmdLine.getArgs();
if (leftArgs.length <= 0) {
System.err.println("ERROR: missing ledger id");
printUsage();
return -1;
}
boolean printMeta = false;
if (cmdLine.hasOption("m")) {
printMeta = true;
}
long ledgerId;
try {
ledgerId = Long.parseLong(leftArgs[0]);
} catch (NumberFormatException nfe) {
System.err.println("ERROR: invalid ledger id " + leftArgs[0]);
printUsage();
return -1;
}
if (printMeta) {
// print meta
readLedgerMeta(ledgerId);
}
// dump ledger info
readLedgerIndexEntries(ledgerId);
return 0;
}
@Override
String getDescription() {
return "Dump ledger index entries into readable format.";
}
@Override
String getUsage() {
return "ledger [-m] <ledger_id>";
}
@Override
Options getOptions() {
return lOpts;
}
}
/**
* Command for reading ledger entries
*/
class ReadLedgerEntriesCmd extends MyCommand {
Options lOpts = new Options();
ReadLedgerEntriesCmd() {
super(CMD_READ_LEDGER_ENTRIES);
}
@Override
Options getOptions() {
return lOpts;
}
@Override
String getDescription() {
return "Read a range of entries from a ledger";
}
@Override
String getUsage() {
return "readledger <ledger_id> [<start_entry_id> [<end_entry_id>]]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
String[] leftArgs = cmdLine.getArgs();
if (leftArgs.length <= 0) {
System.err.println("ERROR: missing ledger id");
printUsage();
return -1;
}
long ledgerId;
long firstEntry = 0;
long lastEntry = -1;
try {
ledgerId = Long.parseLong(leftArgs[0]);
if (leftArgs.length >= 2) {
firstEntry = Long.parseLong(leftArgs[1]);
}
if (leftArgs.length >= 3) {
lastEntry = Long.parseLong(leftArgs[2]);
}
} catch (NumberFormatException nfe) {
System.err.println("ERROR: invalid number " + nfe.getMessage());
printUsage();
return -1;
}
ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(bkConf);
BookKeeperAdmin bk = null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
bk = new BookKeeperAdmin(conf);
Iterator<LedgerEntry> entries = bk.readEntries(ledgerId, firstEntry, lastEntry).iterator();
while (entries.hasNext()) {
LedgerEntry entry = entries.next();
HexDump.dump(entry.getEntry(), 0, out, 0);
System.out.println(
"Entry Id: " + entry.getEntryId() + ", Data: " + new String(out.toByteArray(), UTF_8));
out.reset();
}
} catch (Exception e) {
LOG.error("Error reading entries from ledger {}", ledgerId, e.getCause());
return -1;
} finally {
out.close();
if (bk != null) {
bk.close();
}
}
return 0;
}
}
/**
* Command for listing underreplicated ledgers
*/
class ListUnderreplicatedCmd extends MyCommand {
Options opts = new Options();
public ListUnderreplicatedCmd() {
super(CMD_LISTUNDERREPLICATED);
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "List ledgers marked as underreplicated";
}
@Override
String getUsage() {
return "listunderreplicated";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeper zk = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate();
while (iter.hasNext()) {
System.out.println(iter.next());
}
} finally {
if (zk != null) {
zk.close();
}
}
return 0;
}
}
final static int LIST_BATCH_SIZE = 1000;
/**
* Command to list all ledgers in the cluster
*/
class ListLedgersCmd extends MyCommand {
Options lOpts = new Options();
ListLedgersCmd() {
super(CMD_LISTLEDGERS);
lOpts.addOption("m", "meta", false, "Print metadata");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeper zk = null;
LedgerManagerFactory mFactory = null;
LedgerManager m = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
m = mFactory.newLedgerManager();
LedgerRangeIterator iter = m.getLedgerRanges();
if (cmdLine.hasOption("m")) {
List<ReadMetadataCallback> futures
= new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
while (iter.hasNext()) {
LedgerRange r = iter.next();
for (Long lid : r.getLedgers()) {
ReadMetadataCallback cb = new ReadMetadataCallback(lid);
m.readLedgerMetadata(lid, cb);
futures.add(cb);
}
if (futures.size() >= LIST_BATCH_SIZE) {
while (futures.size() > 0) {
ReadMetadataCallback cb = futures.remove(0);
printLedgerMetadata(cb);
}
}
}
while (futures.size() > 0) {
ReadMetadataCallback cb = futures.remove(0);
printLedgerMetadata(cb);
}
} else {
while (iter.hasNext()) {
LedgerRange r = iter.next();
for (Long lid : r.getLedgers()) {
System.out.println(Long.toString(lid));
}
}
}
} finally {
if (m != null) {
try {
m.close();
mFactory.uninitialize();
} catch (IOException ioe) {
LOG.error("Failed to close ledger manager : ", ioe);
}
}
if (zk != null) {
zk.close();
}
}
return 0;
}
@Override
String getDescription() {
return "List all ledgers on the cluster (this may take a long time)";
}
@Override
String getUsage() {
return "listledgers [-meta]";
}
@Override
Options getOptions() {
return lOpts;
}
}
static void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
LedgerMetadata md = cb.get();
System.out.println("ledgerID: " + cb.getLedgerId());
System.out.println(new String(md.serialize(), UTF_8));
}
static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
implements GenericCallback<LedgerMetadata> {
final long ledgerId;
ReadMetadataCallback(long ledgerId) {
this.ledgerId = ledgerId;
}
long getLedgerId() {
return ledgerId;
}
public void operationComplete(int rc, LedgerMetadata result) {
if (rc != 0) {
setException(BKException.create(rc));
} else {
set(result);
}
}
}
/**
* Print the metadata for a ledger
*/
class LedgerMetadataCmd extends MyCommand {
Options lOpts = new Options();
LedgerMetadataCmd() {
super(CMD_LEDGERMETADATA);
lOpts.addOption("l", "ledgerid", true, "Ledger ID");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
final long lid = getOptionLongValue(cmdLine, "ledgerid", -1);
if (lid == -1) {
System.err.println("Must specify a ledger id");
return -1;
}
ZooKeeper zk = null;
LedgerManagerFactory mFactory = null;
LedgerManager m = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
m = mFactory.newLedgerManager();
ReadMetadataCallback cb = new ReadMetadataCallback(lid);
m.readLedgerMetadata(lid, cb);
printLedgerMetadata(cb);
} finally {
if (m != null) {
try {
m.close();
mFactory.uninitialize();
} catch (IOException ioe) {
LOG.error("Failed to close ledger manager : ", ioe);
}
}
if (zk != null) {
zk.close();
}
}
return 0;
}
@Override
String getDescription() {
return "Print the metadata for a ledger";
}
@Override
String getUsage() {
return "ledgermetadata -ledgerid <ledgerid>";
}
@Override
Options getOptions() {
return lOpts;
}
}
/**
* Simple test to create a ledger and write to it
*/
class SimpleTestCmd extends MyCommand {
Options lOpts = new Options();
SimpleTestCmd() {
super(CMD_SIMPLETEST);
lOpts.addOption("e", "ensemble", true, "Ensemble size (default 3)");
lOpts.addOption("w", "writeQuorum", true, "Write quorum size (default 2)");
lOpts.addOption("a", "ackQuorum", true, "Ack quorum size (default 2)");
lOpts.addOption("n", "numEntries", true, "Entries to write (default 1000)");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
byte[] data = new byte[100]; // test data
int ensemble = getOptionIntValue(cmdLine, "ensemble", 3);
int writeQuorum = getOptionIntValue(cmdLine, "writeQuorum", 2);
int ackQuorum = getOptionIntValue(cmdLine, "ackQuorum", 2);
int numEntries = getOptionIntValue(cmdLine, "numEntries", 1000);
ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(bkConf);
BookKeeper bk = new BookKeeper(conf);
LedgerHandle lh = bk.createLedger(ensemble, writeQuorum, ackQuorum,
BookKeeper.DigestType.MAC, new byte[0]);
System.out.println("Ledger ID: " + lh.getId());
long lastReport = System.nanoTime();
for (int i = 0; i < numEntries; i++) {
lh.addEntry(data);
if (TimeUnit.SECONDS.convert(System.nanoTime() - lastReport,
TimeUnit.NANOSECONDS) > 1) {
System.out.println(i + " entries written");
lastReport = System.nanoTime();
}
}
lh.close();
bk.close();
System.out.println(numEntries + " entries written to ledger " + lh.getId());
return 0;
}
@Override
String getDescription() {
return "Simple test to create a ledger and write entries to it";
}
@Override
String getUsage() {
return "simpletest [-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries N]";
}
@Override
Options getOptions() {
return lOpts;
}
}
/**
* Command to run a bookie sanity test
*/
class BookieSanityTestCmd extends MyCommand {
Options lOpts = new Options();
BookieSanityTestCmd() {
super(CMD_BOOKIESANITYTEST);
lOpts.addOption("e", "entries", true, "Total entries to be added for the test (default 10)");
lOpts.addOption("t", "timeout", true, "Timeout for write/read operations in seconds (default 1)");
}
@Override
Options getOptions() {
return lOpts;
}
@Override
String getDescription() {
return "Sanity test for local bookie. Create ledger and write/reads entries on local bookie.";
}
@Override
String getUsage() {
return "bookiesanity [-entries N] [-timeout N]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
int numberOfEntries = getOptionIntValue(cmdLine, "entries", 10);
int timeoutSecs= getOptionIntValue(cmdLine, "timeout", 1);
ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(bkConf);
conf.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class);
conf.setAddEntryTimeout(timeoutSecs);
conf.setReadEntryTimeout(timeoutSecs);
BookKeeper bk = new BookKeeper(conf);
LedgerHandle lh = null;
try {
lh = bk.createLedger(1, 1, DigestType.MAC, new byte[0]);
LOG.info("Created ledger {}", lh.getId());
for (int i = 0; i < numberOfEntries; i++) {
String content = "entry-" + i;
lh.addEntry(content.getBytes(UTF_8));
}
LOG.info("Written {} entries in ledger {}", numberOfEntries, lh.getId());
// Reopen the ledger and read entries
lh = bk.openLedger(lh.getId(), DigestType.MAC, new byte[0]);
if (lh.getLastAddConfirmed() != (numberOfEntries - 1)) {
throw new Exception("Invalid last entry found on ledger. expecting: " + (numberOfEntries - 1)
+ " -- found: " + lh.getLastAddConfirmed());
}
Enumeration<LedgerEntry> entries = lh.readEntries(0, numberOfEntries - 1);
int i = 0;
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String actualMsg = new String(entry.getEntry(), UTF_8);
String expectedMsg = "entry-" + (i++);
if (!expectedMsg.equals(actualMsg)) {
throw new Exception("Failed validation of received message - Expected: " + expectedMsg
+ ", Actual: " + actualMsg);
}
}
LOG.info("Read {} entries from ledger {}", entries, lh.getId());
} catch (Exception e) {
LOG.warn("Error in bookie sanity test", e);
return -1;
} finally {
if (lh != null) {
bk.deleteLedger(lh.getId());
LOG.info("Deleted ledger {}", lh.getId());
}
bk.close();
}
LOG.info("Bookie sanity test succeeded");
return 0;
}
}
/**
* Command to read entry log files.
*/
class ReadLogCmd extends MyCommand {
Options rlOpts = new Options();
ReadLogCmd() {
super(CMD_READLOG);
rlOpts.addOption("m", "msg", false, "Print message body");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
String[] leftArgs = cmdLine.getArgs();
if (leftArgs.length <= 0) {
System.err.println("ERROR: missing entry log id or entry log file name");
printUsage();
return -1;
}
boolean printMsg = false;
if (cmdLine.hasOption("m")) {
printMsg = true;
}
long logId;
try {
logId = Long.parseLong(leftArgs[0]);
} catch (NumberFormatException nfe) {
// not a entry log id
File f = new File(leftArgs[0]);
String name = f.getName();
if (!name.endsWith(".log")) {
// not a log file
System.err.println("ERROR: invalid entry log file name " + leftArgs[0]);
printUsage();
return -1;
}
String idString = name.split("\\.")[0];
logId = Long.parseLong(idString, 16);
}
// scan entry log
scanEntryLog(logId, printMsg);
return 0;
}
@Override
String getDescription() {
return "Scan an entry file and format the entries into readable format.";
}
@Override
String getUsage() {
return "readlog [-msg] <entry_log_id | entry_log_file_name>";
}
@Override
Options getOptions() {
return rlOpts;
}
}
/**
* Command to read journal files
*/
class ReadJournalCmd extends MyCommand {
Options rjOpts = new Options();
ReadJournalCmd() {
super(CMD_READJOURNAL);
rjOpts.addOption("m", "msg", false, "Print message body");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
String[] leftArgs = cmdLine.getArgs();
if (leftArgs.length <= 0) {
System.err.println("ERROR: missing journal id or journal file name");
printUsage();
return -1;
}
boolean printMsg = false;
if (cmdLine.hasOption("m")) {
printMsg = true;
}
long journalId;
try {
journalId = Long.parseLong(leftArgs[0]);
} catch (NumberFormatException nfe) {
// not a journal id
File f = new File(leftArgs[0]);
String name = f.getName();
if (!name.endsWith(".txn")) {
// not a journal file
System.err.println("ERROR: invalid journal file name " + leftArgs[0]);
printUsage();
return -1;
}
String idString = name.split("\\.")[0];
journalId = Long.parseLong(idString, 16);
}
// scan journal
scanJournal(journalId, printMsg);
return 0;
}
@Override
String getDescription() {
return "Scan a journal file and format the entries into readable format.";
}
@Override
String getUsage() {
return "readjournal [-msg] <journal_id | journal_file_name>";
}
@Override
Options getOptions() {
return rjOpts;
}
}
/**
* Command to print last log mark
*/
class LastMarkCmd extends MyCommand {
LastMarkCmd() {
super(CMD_LASTMARK);
}
@Override
public int runCmd(CommandLine c) throws Exception {
printLastLogMark();
return 0;
}
@Override
String getDescription() {
return "Print last log marker.";
}
@Override
String getUsage() {
return "lastmark";
}
@Override
Options getOptions() {
return new Options();
}
}
/**
* List available bookies
*/
class ListBookiesCmd extends MyCommand {
Options opts = new Options();
ListBookiesCmd() {
super(CMD_LISTBOOKIES);
opts.addOption("rw", "readwrite", false, "Print readwrite bookies");
opts.addOption("ro", "readonly", false, "Print readonly bookies");
opts.addOption("h", "hostnames", false,
"Also print hostname of the bookie");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
boolean readwrite = cmdLine.hasOption("rw");
boolean readonly = cmdLine.hasOption("ro");
if ((!readwrite && !readonly) || (readwrite && readonly)) {
LOG.error("One and only one of -readwrite and -readonly must be specified");
printUsage();
return 1;
}
ClientConfiguration clientconf = new ClientConfiguration(bkConf)
.setZkServers(bkConf.getZkServers());
BookKeeperAdmin bka = new BookKeeperAdmin(clientconf);
int count = 0;
Collection<BookieSocketAddress> bookies = new ArrayList<BookieSocketAddress>();
if (cmdLine.hasOption("rw")) {
Collection<BookieSocketAddress> availableBookies = bka
.getAvailableBookies();
bookies.addAll(availableBookies);
} else if (cmdLine.hasOption("ro")) {
Collection<BookieSocketAddress> roBookies = bka
.getReadOnlyBookies();
bookies.addAll(roBookies);
}
for (BookieSocketAddress b : bookies) {
System.out.print(b);
if (cmdLine.hasOption("h")) {
System.out.print("\t" + b.getSocketAddress().getHostName());
}
System.out.println("");
count++;
}
if (count == 0) {
System.err.println("No bookie exists!");
return 1;
}
return 0;
}
@Override
String getDescription() {
return "List the bookies, which are running as either readwrite or readonly mode.";
}
@Override
String getUsage() {
return "listbookies [-readwrite|-readonly] [-hostnames]";
}
@Override
Options getOptions() {
return opts;
}
}
/**
* Command to print help message
*/
class HelpCmd extends MyCommand {
HelpCmd() {
super(CMD_HELP);
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
String[] args = cmdLine.getArgs();
if (args.length == 0) {
printShellUsage();
return 0;
}
String cmdName = args[0];
Command cmd = commands.get(cmdName);
if (null == cmd) {
System.err.println("Unknown command " + cmdName);
printShellUsage();
return -1;
}
cmd.printUsage();
return 0;
}
@Override
String getDescription() {
return "Describe the usage of this program or its subcommands.";
}
@Override
String getUsage() {
return "help [COMMAND]";
}
@Override
Options getOptions() {
return new Options();
}
}
/**
* Command for administration of autorecovery
*/
class AutoRecoveryCmd extends MyCommand {
Options opts = new Options();
public AutoRecoveryCmd() {
super(CMD_AUTORECOVERY);
opts.addOption("e", "enable", false,
"Enable auto recovery of underreplicated ledgers");
opts.addOption("d", "disable", false,
"Disable auto recovery of underreplicated ledgers");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Enable or disable autorecovery in the cluster.";
}
@Override
String getUsage() {
return "autorecovery [-enable|-disable]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
boolean disable = cmdLine.hasOption("d");
boolean enable = cmdLine.hasOption("e");
if ((!disable && !enable)
|| (enable && disable)) {
LOG.error("One and only one of -enable and -disable must be specified");
printUsage();
return 1;
}
ZooKeeper zk = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bkConf, zk);
LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
if (enable) {
if (underreplicationManager.isLedgerReplicationEnabled()) {
LOG.warn("Autorecovery already enabled. Doing nothing");
} else {
LOG.info("Enabling autorecovery");
underreplicationManager.enableLedgerReplication();
}
} else {
if (!underreplicationManager.isLedgerReplicationEnabled()) {
LOG.warn("Autorecovery already disabled. Doing nothing");
} else {
LOG.info("Disabling autorecovery");
underreplicationManager.disableLedgerReplication();
}
}
} finally {
if (zk != null) {
zk.close();
}
}
return 0;
}
}
/**
* Print which node has the auditor lock
*/
class WhoIsAuditorCmd extends MyCommand {
Options opts = new Options();
public WhoIsAuditorCmd() {
super(CMD_WHOISAUDITOR);
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Print the node which holds the auditor lock";
}
@Override
String getUsage() {
return "whoisauditor";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeper zk = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
BookieSocketAddress bookieId = AuditorElector.getCurrentAuditor(bkConf, zk);
if (bookieId == null) {
LOG.info("No auditor elected");
return -1;
}
LOG.info("Auditor: {}/{}:{}",
new Object[] {
bookieId.getSocketAddress().getAddress().getCanonicalHostName(),
bookieId.getSocketAddress().getAddress().getHostAddress(),
bookieId.getSocketAddress().getPort() });
} finally {
if (zk != null) {
zk.close();
}
}
return 0;
}
}
/**
* Update cookie command
*/
class UpdateCookieCmd extends MyCommand {
Options opts = new Options();
UpdateCookieCmd() {
super(CMD_UPDATECOOKIE);
opts.addOption("b", "bookieId", true, "Bookie Id");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Update bookie id in cookie";
}
@Override
String getUsage() {
return "updatecookie -bookieId <hostname|ip>";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
final String bookieId = cmdLine.getOptionValue("bookieId");
if (StringUtils.isBlank(bookieId)) {
LOG.error("Invalid argument list!");
this.printUsage();
return -1;
}
if (!StringUtils.equals(bookieId, "hostname") && !StringUtils.equals(bookieId, "ip")) {
LOG.error("Invalid option value:" + bookieId);
this.printUsage();
return -1;
}
boolean useHostName = getOptionalValue(bookieId, "hostname");
if (!bkConf.getUseHostNameAsBookieID() && useHostName) {
LOG.error("Expects configuration useHostNameAsBookieID=true as the option value passed is 'hostname'");
return -1;
} else if (bkConf.getUseHostNameAsBookieID() && !useHostName) {
LOG.error("Expects configuration useHostNameAsBookieID=false as the option value passed is 'ip'");
return -1;
}
return updateBookieIdInCookie(bookieId, useHostName);
}
private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws IOException,
InterruptedException {
ZooKeeper zk = null;
try {
zk = ZooKeeperClient.newBuilder()
.connectString(bkConf.getZkServers())
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
ServerConfiguration conf = new ServerConfiguration(bkConf);
String newBookieId = Bookie.getBookieAddress(conf).toString();
// read oldcookie
Versioned<Cookie> oldCookie = null;
try {
conf.setUseHostNameAsBookieID(!useHostname);
oldCookie = Cookie.readFromZooKeeper(zk, conf);
} catch (KeeperException.NoNodeException nne) {
LOG.error("Either cookie already updated with UseHostNameAsBookieID={} or no cookie exists!",
useHostname, nne);
return -1;
}
Cookie newCookie = Cookie.newBuilder(oldCookie.getValue()).setBookieHost(newBookieId).build();
boolean hasCookieUpdatedInDirs = verifyCookie(newCookie, journalDirectory);
for (File dir : ledgerDirectories) {
hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
}
if (indexDirectories != ledgerDirectories) {
for (File dir : indexDirectories) {
hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
}
}
if (hasCookieUpdatedInDirs) {
try {
conf.setUseHostNameAsBookieID(useHostname);
Cookie.readFromZooKeeper(zk, conf);
// since newcookie exists, just do cleanup of oldcookie and return
conf.setUseHostNameAsBookieID(!useHostname);
oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
return 0;
} catch (KeeperException.NoNodeException nne) {
LOG.debug("Ignoring, cookie will be written to zookeeper");
}
} else {
// writes newcookie to local dirs
newCookie.writeToDirectory(journalDirectory);
LOG.info("Updated cookie file present in journalDirectory {}", journalDirectory);
for (File dir : ledgerDirectories) {
newCookie.writeToDirectory(dir);
}
LOG.info("Updated cookie file present in ledgerDirectories {}", ledgerDirectories);
if (ledgerDirectories != indexDirectories) {
for (File dir : indexDirectories) {
newCookie.writeToDirectory(dir);
}
LOG.info("Updated cookie file present in indexDirectories {}", indexDirectories);
}
}
// writes newcookie to zookeeper
conf.setUseHostNameAsBookieID(useHostname);
newCookie.writeToZooKeeper(zk, conf, Version.NEW);
// delete oldcookie
conf.setUseHostNameAsBookieID(!useHostname);
oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
} catch (KeeperException ke) {
LOG.error("KeeperException during cookie updation!", ke);
return -1;
} catch (IOException ioe) {
LOG.error("IOException during cookie updation!", ioe);
return -1;
} finally {
if (zk != null) {
zk.close();
}
}
return 0;
}
private boolean verifyCookie(Cookie oldCookie, File dir) throws IOException {
try {
Cookie cookie = Cookie.readFromDirectory(dir);
cookie.verify(oldCookie);
} catch (InvalidCookieException e) {
return false;
}
return true;
}
}
/**
* Update ledger command
*/
class UpdateLedgerCmd extends MyCommand {
private final Options opts = new Options();
UpdateLedgerCmd() {
super(CMD_UPDATELEDGER);
opts.addOption("b", "bookieId", true, "Bookie Id");
opts.addOption("s", "updatespersec", true, "Number of ledgers updating per second (default: 5 per sec)");
opts.addOption("l", "limit", true, "Maximum number of ledgers to update (default: no limit)");
opts.addOption("v", "verbose", true, "Print status of the ledger updation (default: false)");
opts.addOption("p", "printprogress", true,
"Print messages on every configured seconds if verbose turned on (default: 10 secs)");
}
@Override
Options getOptions() {
return opts;
}
@Override
String getDescription() {
return "Update bookie id in ledgers (this may take a long time)";
}
@Override
String getUsage() {
return "updateledger -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] [-printprogress N]";
}
@Override
int runCmd(CommandLine cmdLine) throws Exception {
final String bookieId = cmdLine.getOptionValue("bookieId");
if (StringUtils.isBlank(bookieId)) {
LOG.error("Invalid argument list!");
this.printUsage();
return -1;
}
if (!StringUtils.equals(bookieId, "hostname") && !StringUtils.equals(bookieId, "ip")) {
LOG.error("Invalid option value {} for bookieId, expected hostname/ip", bookieId);
this.printUsage();
return -1;
}
boolean useHostName = getOptionalValue(bookieId, "hostname");
if (!bkConf.getUseHostNameAsBookieID() && useHostName) {
LOG.error("Expects configuration useHostNameAsBookieID=true as the option value passed is 'hostname'");
return -1;
} else if (bkConf.getUseHostNameAsBookieID() && !useHostName) {
LOG.error("Expects configuration useHostNameAsBookieID=false as the option value passed is 'ip'");
return -1;
}
final int rate = getOptionIntValue(cmdLine, "updatespersec", 5);
if (rate <= 0) {
LOG.error("Invalid updatespersec {}, should be > 0", rate);
return -1;
}
final int limit = getOptionIntValue(cmdLine, "limit", Integer.MIN_VALUE);
if (limit <= 0 && limit != Integer.MIN_VALUE) {
LOG.error("Invalid limit {}, should be > 0", limit);
return -1;
}
final boolean verbose = getOptionBooleanValue(cmdLine, "verbose", false);
final long printprogress;
if (!verbose) {
if (cmdLine.hasOption("printprogress")) {
LOG.warn("Ignoring option 'printprogress', this is applicable when 'verbose' is true");
}
printprogress = Integer.MIN_VALUE;
} else {
// defaulting to 10 seconds
printprogress = getOptionLongValue(cmdLine, "printprogress", 10);
}
final ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(bkConf);
final BookKeeper bk = new BookKeeper(conf);
final BookKeeperAdmin admin = new BookKeeperAdmin(conf);
final UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, admin);
final ServerConfiguration serverConf = new ServerConfiguration(bkConf);
final BookieSocketAddress newBookieId = Bookie.getBookieAddress(serverConf);
serverConf.setUseHostNameAsBookieID(!useHostName);
final BookieSocketAddress oldBookieId = Bookie.getBookieAddress(serverConf);
UpdateLedgerNotifier progressable = new UpdateLedgerNotifier() {
long lastReport = System.nanoTime();
@Override
public void progress(long updated, long issued) {
if (printprogress <= 0) {
return; // disabled
}
if (TimeUnit.MILLISECONDS.toSeconds(MathUtils.elapsedMSec(lastReport)) >= printprogress) {
LOG.info("Number of ledgers issued={}, updated={}", issued, updated);
lastReport = MathUtils.nowInNano();
}
}
};
try {
updateLedgerOp.updateBookieIdInLedgers(oldBookieId, newBookieId, rate, limit, progressable);
} catch (BKException | IOException e) {
LOG.error("Failed to update ledger metadata", e);
return -1;
}
return 0;
}
}
/**
* A facility for reporting update ledger progress.
*/
public interface UpdateLedgerNotifier {
void progress(long updated, long issued);
}
final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
{
commands.put(CMD_METAFORMAT, new MetaFormatCmd());
commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
commands.put(CMD_RECOVER, new RecoverCmd());
commands.put(CMD_LEDGER, new LedgerCmd());
commands.put(CMD_READ_LEDGER_ENTRIES, new ReadLedgerEntriesCmd());
commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
commands.put(CMD_BOOKIESANITYTEST, new BookieSanityTestCmd());
commands.put(CMD_READLOG, new ReadLogCmd());
commands.put(CMD_READJOURNAL, new ReadJournalCmd());
commands.put(CMD_LASTMARK, new LastMarkCmd());
commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
commands.put(CMD_HELP, new HelpCmd());
}
@Override
public void setConf(Configuration conf) throws Exception {
bkConf.loadConf(conf);
journalDirectory = Bookie.getCurrentDirectory(bkConf.getJournalDir());
ledgerDirectories = Bookie.getCurrentDirectories(bkConf.getLedgerDirs());
if (null == bkConf.getIndexDirs()) {
indexDirectories = ledgerDirectories;
} else {
indexDirectories = Bookie.getCurrentDirectories(bkConf.getIndexDirs());
}
formatter = EntryFormatter.newEntryFormatter(bkConf, ENTRY_FORMATTER_CLASS);
LOG.debug("Using entry formatter {}", formatter.getClass().getName());
pageSize = bkConf.getPageSize();
entriesPerPage = pageSize / 8;
}
private void printShellUsage() {
System.err.println("Usage: BookieShell [-conf configuration] <command>");
System.err.println();
List<String> commandNames = new ArrayList<String>();
for (MyCommand c : commands.values()) {
commandNames.add(" " + c.getUsage());
}
Collections.sort(commandNames);
for (String s : commandNames) {
System.err.println(s);
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length <= 0) {
printShellUsage();
return -1;
}
String cmdName = args[0];
Command cmd = commands.get(cmdName);
if (null == cmd) {
System.err.println("ERROR: Unknown command " + cmdName);
printShellUsage();
return -1;
}
// prepare new args
String[] newArgs = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, newArgs.length);
return cmd.runCmd(newArgs);
}
public static void main(String argv[]) throws Exception {
BookieShell shell = new BookieShell();
if (argv.length <= 0) {
shell.printShellUsage();
System.exit(-1);
}
CompositeConfiguration conf = new CompositeConfiguration();
// load configuration
if ("-conf".equals(argv[0])) {
if (argv.length <= 1) {
shell.printShellUsage();
System.exit(-1);
}
conf.addConfiguration(new PropertiesConfiguration(
new File(argv[1]).toURI().toURL()));
String[] newArgv = new String[argv.length - 2];
System.arraycopy(argv, 2, newArgv, 0, newArgv.length);
argv = newArgv;
}
shell.setConf(conf);
int res = shell.run(argv);
System.exit(res);
}
///
/// Bookie File Operations
///
/**
* Get the ledger file of a specified ledger.
*
* @param ledgerId
* Ledger Id
*
* @return file object.
*/
private File getLedgerFile(long ledgerId) {
String ledgerName = IndexPersistenceMgr.getLedgerName(ledgerId);
File lf = null;
for (File d : indexDirectories) {
lf = new File(d, ledgerName);
if (lf.exists()) {
break;
}
lf = null;
}
return lf;
}
/**
* Get FileInfo for a specified ledger.
*
* @param ledgerId
* Ledger Id
* @return read only file info instance
*/
ReadOnlyFileInfo getFileInfo(long ledgerId) throws IOException {
File ledgerFile = getLedgerFile(ledgerId);
if (null == ledgerFile) {
throw new FileNotFoundException("No index file found for ledger " + ledgerId + ". It may be not flushed yet.");
}
ReadOnlyFileInfo fi = new ReadOnlyFileInfo(ledgerFile, null);
fi.readHeader();
return fi;
}
private synchronized void initEntryLogger() throws IOException {
if (null == entryLogger) {
// provide read only entry logger
entryLogger = new ReadOnlyEntryLogger(bkConf);
}
}
/**
* scan over entry log
*
* @param logId
* Entry Log Id
* @param scanner
* Entry Log Scanner
*/
protected void scanEntryLog(long logId, EntryLogScanner scanner) throws IOException {
initEntryLogger();
entryLogger.scanEntryLog(logId, scanner);
}
private synchronized Journal getJournal() throws IOException {
if (null == journal) {
journal = new Journal(bkConf, new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()));
}
return journal;
}
/**
* Scan journal file
*
* @param journalId
* Journal File Id
* @param scanner
* Journal File Scanner
*/
protected void scanJournal(long journalId, JournalScanner scanner) throws IOException {
getJournal().scanJournal(journalId, 0L, scanner);
}
///
/// Bookie Shell Commands
///
/**
* Read ledger meta
*
* @param ledgerId
* Ledger Id
*/
protected void readLedgerMeta(long ledgerId) throws Exception {
System.out.println("===== LEDGER: " + ledgerId + " =====");
FileInfo fi = getFileInfo(ledgerId);
byte[] masterKey = fi.getMasterKey();
if (null == masterKey) {
System.out.println("master key : NULL");
} else {
System.out.println("master key : " + bytes2Hex(fi.getMasterKey()));
}
long size = fi.size();
if (size % 8 == 0) {
System.out.println("size : " + size);
} else {
System.out.println("size : " + size + " (not aligned with 8, may be corrupted or under flushing now)");
}
System.out.println("entries : " + (size / 8));
System.out.println("isFenced : " + fi.isFenced());
}
/**
* Read ledger index entires
*
* @param ledgerId
* Ledger Id
* @throws IOException
*/
protected void readLedgerIndexEntries(long ledgerId) throws IOException {
System.out.println("===== LEDGER: " + ledgerId + " =====");
FileInfo fi = getFileInfo(ledgerId);
long size = fi.size();
System.out.println("size : " + size);
long curSize = 0;
long curEntry = 0;
LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
lep.usePage();
try {
while (curSize < size) {
lep.setLedgerAndFirstEntry(ledgerId, curEntry);
lep.readPage(fi);
// process a page
for (int i=0; i<entriesPerPage; i++) {
long offset = lep.getOffset(i * 8);
if (0 == offset) {
System.out.println("entry " + curEntry + "\t:\tN/A");
} else {
long entryLogId = offset >> 32L;
long pos = offset & 0xffffffffL;
System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")");
}
++curEntry;
}
curSize += pageSize;
}
} catch (IOException ie) {
LOG.error("Failed to read index page : ", ie);
if (curSize + pageSize < size) {
System.out.println("Failed to read index page @ " + curSize + ", the index file may be corrupted : " + ie.getMessage());
} else {
System.out.println("Failed to read last index page @ " + curSize
+ ", the index file may be corrupted or last index page is not fully flushed yet : " + ie.getMessage());
}
}
}
/**
* Scan over an entry log file.
*
* @param logId
* Entry Log File id.
* @param printMsg
* Whether printing the entry data.
*/
protected void scanEntryLog(long logId, final boolean printMsg) throws Exception {
System.out.println("Scan entry log " + logId + " (" + Long.toHexString(logId) + ".log)");
scanEntryLog(logId, new EntryLogScanner() {
@Override
public boolean accept(long ledgerId) {
return true;
}
@Override
public void process(long ledgerId, long startPos, ByteBuffer entry) {
formatEntry(startPos, entry, printMsg);
}
});
}
/**
* Scan a journal file
*
* @param journalId
* Journal File Id
* @param printMsg
* Whether printing the entry data.
*/
protected void scanJournal(long journalId, final boolean printMsg) throws Exception {
System.out.println("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)");
scanJournal(journalId, new JournalScanner() {
boolean printJournalVersion = false;
@Override
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
if (!printJournalVersion) {
System.out.println("Journal Version : " + journalVersion);
printJournalVersion = true;
}
formatEntry(offset, entry, printMsg);
}
});
}
/**
* Print last log mark
*/
protected void printLastLogMark() throws IOException {
LogMark lastLogMark = getJournal().getLastLogMark().getCurMark();
System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "("
+ Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
+ lastLogMark.getLogFileOffset());
}
/**
* Format the message into a readable format.
*
* @param pos
* File offset of the message stored in entry log file
* @param recBuff
* Entry Data
* @param printMsg
* Whether printing the message body
*/
private void formatEntry(long pos, ByteBuffer recBuff, boolean printMsg) {
long ledgerId = recBuff.getLong();
long entryId = recBuff.getLong();
int entrySize = recBuff.limit();
System.out.println("--------- Lid=" + ledgerId + ", Eid=" + entryId
+ ", ByteOffset=" + pos + ", EntrySize=" + entrySize + " ---------");
if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
int masterKeyLen = recBuff.getInt();
byte[] masterKey = new byte[masterKeyLen];
recBuff.get(masterKey);
System.out.println("Type: META");
System.out.println("MasterKey: " + bytes2Hex(masterKey));
System.out.println();
return;
}
if (entryId == Bookie.METAENTRY_ID_FENCE_KEY) {
System.out.println("Type: META");
System.out.println("Fenced");
System.out.println();
return;
}
// process a data entry
long lastAddConfirmed = recBuff.getLong();
System.out.println("Type: DATA");
System.out.println("LastConfirmed: " + lastAddConfirmed);
if (!printMsg) {
System.out.println();
return;
}
// skip digest checking
recBuff.position(32 + 8);
System.out.println("Data:");
System.out.println();
try {
byte[] ret = new byte[recBuff.remaining()];
recBuff.get(ret);
formatter.formatEntry(ret);
} catch (Exception e) {
System.out.println("N/A. Corrupted.");
}
System.out.println();
}
static String bytes2Hex(byte[] data) {
StringBuilder sb = new StringBuilder(data.length * 2);
Formatter formatter = new Formatter(sb);
for (byte b : data) {
formatter.format("%02x", b);
}
formatter.close();
return sb.toString();
}
private static int getOptionIntValue(CommandLine cmdLine, String option, int defaultVal) {
if (cmdLine.hasOption(option)) {
String val = cmdLine.getOptionValue(option);
try {
return Integer.parseInt(val);
} catch (NumberFormatException nfe) {
System.err.println("ERROR: invalid value for option " + option + " : " + val);
return defaultVal;
}
}
return defaultVal;
}
private static long getOptionLongValue(CommandLine cmdLine, String option, long defaultVal) {
if (cmdLine.hasOption(option)) {
String val = cmdLine.getOptionValue(option);
try {
return Long.parseLong(val);
} catch (NumberFormatException nfe) {
System.err.println("ERROR: invalid value for option " + option + " : " + val);
return defaultVal;
}
}
return defaultVal;
}
private static boolean getOptionBooleanValue(CommandLine cmdLine, String option, boolean defaultVal) {
if (cmdLine.hasOption(option)) {
String val = cmdLine.getOptionValue(option);
return Boolean.parseBoolean(val);
}
return defaultVal;
}
private static boolean getOptionalValue(String optValue, String optName) {
if (StringUtils.equals(optValue, optName)) {
return true;
}
return false;
}
}