blob: b2c63f4991d68b31a8d6d8d33b41c1bc11035396 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.tools.cli.commands.bookie;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Command to scan a journal file and format the entries into readable format.
*/
public class ReadJournalCommand extends BookieCommand<ReadJournalCommand.ReadJournalFlags> {
private static final String NAME = "readjournal";
private static final String DESC = "Scan a journal file and format the entries into readable format.";
private static final long DEFAULT_JOURNALID = -1L;
private static final String DEFAULT = "";
private LedgerIdFormatter ledgerIdFormatter;
private EntryFormatter entryFormatter;
private static final Logger LOG = LoggerFactory.getLogger(ReadJournalCommand.class);
List<Journal> journals = null;
public ReadJournalCommand() {
this(new ReadJournalFlags());
}
public ReadJournalCommand(LedgerIdFormatter idFormatter, EntryFormatter entryFormatter) {
this(new ReadJournalFlags());
this.ledgerIdFormatter = idFormatter;
this.entryFormatter = entryFormatter;
}
ReadJournalCommand(ReadJournalFlags flags) {
super(CliSpec.<ReadJournalFlags>newBuilder().withName(NAME).withDescription(DESC).withFlags(flags).build());
}
/**
* Flag for read journal command.
*/
@Accessors(fluent = true)
@Setter
public static class ReadJournalFlags extends CliFlags {
@Parameter(names = {"-m", "--msg"}, description = "Print message body")
private boolean msg;
@Parameter(names = { "-d", "--dir" }, description = "Journal directory (needed if more than one journal "
+ "configured)")
private String dir = DEFAULT;
@Parameter(names = {"-id", "--journalid"}, description = "Journal Id")
private long journalId = DEFAULT_JOURNALID;
@Parameter(names = {"-f", "--filename"}, description = "Journal file name")
private String fileName = DEFAULT;
@Parameter(names = {"-l", "--ledgerIdFormatter"}, description = "Set ledger id formatter")
private String ledgerIdFormatter = DEFAULT;
@Parameter(names = {"-e", "--entryformatter"}, description = "set entry formatter")
private String entryFormatter = DEFAULT;
}
@Override
public boolean apply(ServerConfiguration conf, ReadJournalFlags cmdFlags) {
initTools(conf, cmdFlags);
if (!checkArgs(cmdFlags)) {
return false;
}
try {
return handler(conf, cmdFlags);
} catch (IOException e) {
throw new UncheckedExecutionException(e.getMessage(), e);
}
}
private void initTools(ServerConfiguration conf, ReadJournalFlags flags) {
if (!flags.ledgerIdFormatter.equals(DEFAULT)) {
ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf);
} else {
ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
}
if (!flags.entryFormatter.equals(DEFAULT)) {
entryFormatter = EntryFormatter.newEntryFormatter(flags.entryFormatter, conf);
} else {
entryFormatter = EntryFormatter.newEntryFormatter(conf);
}
}
private boolean handler(ServerConfiguration conf, ReadJournalFlags cmd) throws IOException {
Journal journal = null;
if (getJournals(conf).size() > 1) {
if (cmd.dir.equals(DEFAULT)) {
LOG.error("ERROR: invalid or missing journal directory");
usage();
return false;
}
File journalDirectory = new File(cmd.dir);
for (Journal j : getJournals(conf)) {
if (j.getJournalDirectory().equals(journalDirectory)) {
journal = j;
break;
}
}
if (journal == null) {
LOG.error("ERROR: journal directory not found");
usage();
return false;
}
} else {
journal = getJournals(conf).get(0);
}
long journalId = cmd.journalId;
if (cmd.journalId == DEFAULT_JOURNALID && !cmd.fileName.equals(DEFAULT)) {
File f = new File(cmd.fileName);
String name = f.getName();
if (!name.endsWith(".txn")) {
LOG.error("ERROR: invalid journal file name " + cmd.fileName);
usage();
return false;
}
String idString = name.split("\\.")[0];
journalId = Long.parseLong(idString, 16);
}
scanJournal(journal, journalId, cmd.msg);
return true;
}
private boolean checkArgs(ReadJournalFlags flags) {
if ((flags.fileName.equals(DEFAULT) && flags.journalId == DEFAULT_JOURNALID)) {
LOG.info("ERROR: You should figure jounalId or journal filename");
return false;
}
return true;
}
private synchronized List<Journal> getJournals(ServerConfiguration conf) {
if (null == journals) {
journals = Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
int idx = 0;
for (File journalDir : conf.getJournalDirs()) {
journals.add(new Journal(idx++, new File(journalDir, BookKeeperConstants.CURRENT_DIR), conf,
new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))));
}
}
return journals;
}
/**
* Scan a journal file.
*
* @param journalId Journal File Id
* @param printMsg Whether printing the entry data.
*/
private void scanJournal(Journal journal, long journalId, final boolean printMsg) throws IOException {
LOG.info("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)");
scanJournal(journal, journalId, new Journal.JournalScanner() {
boolean printJournalVersion = false;
@Override
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
if (!printJournalVersion) {
LOG.info("Journal Version : " + journalVersion);
printJournalVersion = true;
}
FormatUtil
.formatEntry(offset, Unpooled.wrappedBuffer(entry), printMsg, ledgerIdFormatter, entryFormatter);
}
});
}
private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException {
journal.scanJournal(journalId, 0L, scanner);
}
}