blob: b2dff8f923ff4b8370c813dd59abba5e595f902a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.tools.cli.commands.bookie;
import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.LongStream;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Command to read ledger entries.
*/
public class ReadLedgerCommand extends BookieCommand<ReadLedgerCommand.ReadLedgerFlags> {
private static final Logger LOG = LoggerFactory.getLogger(ReadLedgerCommand.class);
private static final String NAME = "readledger";
private static final String DESC = "Read a range of entries from a ledger.";
EntryFormatter entryFormatter;
LedgerIdFormatter ledgerIdFormatter;
public ReadLedgerCommand() {
this(new ReadLedgerFlags());
}
public ReadLedgerCommand(EntryFormatter entryFormatter, LedgerIdFormatter ledgerIdFormatter) {
this(new ReadLedgerFlags());
this.ledgerIdFormatter = ledgerIdFormatter;
this.entryFormatter = entryFormatter;
}
private ReadLedgerCommand(ReadLedgerFlags flags) {
super(CliSpec.<ReadLedgerFlags>newBuilder()
.withName(NAME)
.withDescription(DESC)
.withFlags(flags)
.build());
}
/**
* Flags for read ledger command.
*/
@Accessors(fluent = true)
@Setter
public static class ReadLedgerFlags extends CliFlags {
@Parameter(names = { "-m", "--msg" }, description = "Print message body")
private boolean msg;
@Parameter(names = { "-l", "--ledgerid" }, description = "Ledger ID")
private long ledgerId = -1;
@Parameter(names = { "-fe", "--firstentryid" }, description = "First Entry ID")
private long firstEntryId = -1;
@Parameter(names = { "-le", "--lastentryid" }, description = "Last Entry ID")
private long lastEntryId = -1;
@Parameter(names = { "-r", "--force-recovery" },
description = "Ensure the ledger is properly closed before reading")
private boolean forceRecovery;
@Parameter(names = { "-b", "--bookie" }, description = "Only read from a specific bookie")
private String bookieAddresss;
@Parameter(names = { "-lf", "--ledgeridformatter" }, description = "Set ledger id formatter")
private String ledgerIdFormatter;
@Parameter(names = { "-ef", "--entryformatter" }, description = "Set entry formatter")
private String entryFormatter;
}
@Override
public boolean apply(ServerConfiguration conf, ReadLedgerFlags cmdFlags) {
if (cmdFlags.ledgerIdFormatter != null && ledgerIdFormatter == null) {
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(cmdFlags.ledgerIdFormatter, conf);
} else if (ledgerIdFormatter == null) {
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
}
if (cmdFlags.entryFormatter != null && entryFormatter == null) {
this.entryFormatter = EntryFormatter.newEntryFormatter(cmdFlags.entryFormatter, conf);
} else if (entryFormatter == null) {
this.entryFormatter = EntryFormatter.newEntryFormatter(conf);
}
try {
return readledger(conf, cmdFlags);
} catch (Exception e) {
throw new UncheckedExecutionException(e.getMessage(), e);
}
}
private boolean readledger(ServerConfiguration serverConf, ReadLedgerFlags flags)
throws InterruptedException, BKException, IOException {
long lastEntry = flags.lastEntryId;
final BookieId bookie;
if (flags.bookieAddresss != null) {
// A particular bookie was specified
bookie = BookieId.parse(flags.bookieAddresss);
} else {
bookie = null;
}
ClientConfiguration conf = new ClientConfiguration();
conf.addConfiguration(serverConf);
try (BookKeeperAdmin bk = new BookKeeperAdmin(conf)) {
if (flags.forceRecovery) {
// Force the opening of the ledger to trigger recovery
try (LedgerHandle lh = bk.openLedger(flags.ledgerId)) {
if (lastEntry == -1 || lastEntry > lh.getLastAddConfirmed()) {
lastEntry = lh.getLastAddConfirmed();
}
}
}
if (bookie == null) {
// No bookie was specified, use normal bk client
Iterator<LedgerEntry> entries = bk.readEntries(flags.ledgerId, flags.firstEntryId, lastEntry)
.iterator();
while (entries.hasNext()) {
LedgerEntry entry = entries.next();
formatEntry(entry, flags.msg);
}
} else {
// Use BookieClient to target a specific bookie
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
OrderedExecutor executor = OrderedExecutor.newBuilder()
.numThreads(1)
.name("BookieClientScheduler")
.build();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT,
executor, scheduler, NullStatsLogger.INSTANCE,
bk.getBookieAddressResolver());
LongStream.range(flags.firstEntryId, lastEntry).forEach(entryId -> {
CompletableFuture<Void> future = new CompletableFuture<>();
bookieClient.readEntry(bookie, flags.ledgerId, entryId,
(rc, ledgerId1, entryId1, buffer, ctx) -> {
if (rc != BKException.Code.OK) {
LOG.error("Failed to read entry {} -- {}", entryId1,
BKException.getMessage(rc));
future.completeExceptionally(BKException.create(rc));
return;
}
LOG.info(
"--------- Lid=" + ledgerIdFormatter.formatLedgerId(flags.ledgerId)
+ ", Eid=" + entryId + " ---------");
if (flags.msg) {
LOG.info("Data: " + ByteBufUtil.prettyHexDump(buffer));
}
future.complete(null);
}, null, BookieProtocol.FLAG_NONE);
try {
future.get();
} catch (Exception e) {
LOG.error("Error future.get while reading entries from ledger {}", flags.ledgerId, e);
}
});
eventLoopGroup.shutdownGracefully();
executor.shutdown();
bookieClient.close();
}
}
return true;
}
/**
* Format the entry into a readable format.
*
* @param entry
* ledgerentry to print
* @param printMsg
* Whether printing the message body
*/
private void formatEntry(LedgerEntry entry, boolean printMsg) {
long ledgerId = entry.getLedgerId();
long entryId = entry.getEntryId();
long entrySize = entry.getLength();
LOG.info("--------- Lid=" + ledgerIdFormatter.formatLedgerId(ledgerId) + ", Eid=" + entryId
+ ", EntrySize=" + entrySize + " ---------");
if (printMsg) {
entryFormatter.formatEntry(entry.getEntry());
}
}
}