blob: 3224bce6c37345e5b48907d20a96424eadafa006 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.tools.cli.commands.bookie;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand.ListLedgersFlags;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Command for list all ledgers on the cluster.
*/
public class ListLedgersCommand extends BookieCommand<ListLedgersFlags> {
private static final Logger LOG = LoggerFactory.getLogger(ListLedgersCommand.class);
private static final String NAME = "listledgers";
private static final String DESC = "List all ledgers on the cluster (this may take a long time).";
private static final String DEFAULT = "";
private LedgerIdFormatter ledgerIdFormatter;
public ListLedgersCommand() {
this(new ListLedgersFlags());
}
public ListLedgersCommand(LedgerIdFormatter ledgerIdFormatter) {
this(new ListLedgersFlags());
this.ledgerIdFormatter = ledgerIdFormatter;
}
public ListLedgersCommand(ListLedgersFlags flags) {
super(CliSpec.<ListLedgersFlags>newBuilder()
.withName(NAME)
.withDescription(DESC)
.withFlags(flags)
.build());
}
/**
* Flags for ListLedgers command.
*/
@Accessors(fluent = true)
@Setter
public static class ListLedgersFlags extends CliFlags{
@Parameter(names = {"-m", "--meta"}, description = "Print metadata")
private boolean meta;
@Parameter(names = { "-id", "--bookieid" }, description = "List ledgers residing in this bookie")
private String bookieId;
@Parameter(names = { "-l", "--ledgerIdFormatter" }, description = "Set ledger id formatter")
private String ledgerIdFormatter = DEFAULT;
}
@Override
public boolean apply(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
initLedgerFrommat(conf, cmdFlags);
try {
handler(conf, cmdFlags);
} catch (UnknownHostException e) {
LOG.error("Bookie id error");
return false;
} catch (MetadataException | ExecutionException e) {
throw new UncheckedExecutionException(e.getMessage(), e);
}
return true;
}
private void initLedgerFrommat(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
if (ledgerIdFormatter != null) {
return;
}
if (!cmdFlags.ledgerIdFormatter.equals(DEFAULT)) {
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(cmdFlags.ledgerIdFormatter, conf);
} else {
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
}
}
public boolean handler(ServerConfiguration conf, ListLedgersFlags flags)
throws UnknownHostException, MetadataException, ExecutionException {
final BookieId bookieAddress = StringUtils.isBlank(flags.bookieId) ? null :
BookieId.parse(flags.bookieId);
runFunctionWithLedgerManagerFactory(conf, mFactory -> {
try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
BookkeeperInternalCallbacks.Processor<Long> ledgerProcessor = (ledgerId, cb) -> {
if (!flags.meta && (bookieAddress == null)) {
printLedgerMetadata(ledgerId, null, false);
cb.processResult(BKException.Code.OK, null, null);
} else {
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> {
if (exception == null) {
if ((bookieAddress == null)
|| BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie
(ledgerId, bookieAddress, metadata.getValue())) {
/*
* the print method has to be in
* synchronized scope, otherwise
* output of printLedgerMetadata
* could interleave since this
* callback for different
* ledgers can happen in
* different threads.
*/
synchronized (ListLedgersCommand.this) {
printLedgerMetadata(ledgerId, metadata.getValue(), flags.meta);
}
}
cb.processResult(BKException.Code.OK, null, null);
} else if (BKException.getExceptionCode(exception)
== BKException.Code.NoSuchLedgerExistsException) {
cb.processResult(BKException.Code.OK, null, null);
} else {
LOG.error("Unable to read the ledger: {} information", ledgerId);
cb.processResult(BKException.getExceptionCode(exception), null, null);
}
});
}
};
ledgerManager.asyncProcessLedgers(ledgerProcessor, (rc, s, obj) -> {
returnCode.set(rc);
processDone.countDown();
}, null, BKException.Code.OK, BKException.Code.ReadException);
processDone.await();
if (returnCode.get() != BKException.Code.OK) {
LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
throw BKException.create(returnCode.get());
}
} catch (Exception ioe) {
LOG.error("Received Exception while processing ledgers", ioe);
throw new UncheckedExecutionException(ioe);
}
return null;
});
return true;
}
private void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
LOG.info("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
if (printMeta) {
LOG.info(md.toString());
}
}
}