blob: bd19a6b5fabe63beabdc78a3aeccefefde4c22bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class encapsulated the ledger recovery operation. It first does a read
* with entry-id of -1 (BookieProtocol.LAST_ADD_CONFIRMED) to all bookies. Then
* starting from the last confirmed entry (from hints in the ledger entries),
* it reads forward until it is not able to find a particular entry.
*/
class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
final LedgerHandle lh;
final ClientContext clientCtx;
final CompletableFuture<LedgerHandle> promise;
final AtomicLong readCount, writeCount;
volatile boolean readDone;
volatile long startEntryToRead;
volatile long endEntryToRead;
// keep a copy of metadata for recovery.
LedgerMetadata metadataForRecovery;
// EntryListener Hook
@VisibleForTesting
ReadEntryListener entryListener = null;
class RecoveryReadOp extends ListenerBasedPendingReadOp {
RecoveryReadOp(LedgerHandle lh,
ClientContext clientCtx,
long startEntryId, long endEntryId,
ReadEntryListener cb, Object ctx) {
super(lh, clientCtx, startEntryId, endEntryId, cb, ctx, true);
}
@Override
protected LedgerMetadata getLedgerMetadata() {
return metadataForRecovery;
}
}
public LedgerRecoveryOp(LedgerHandle lh, ClientContext clientCtx) {
readCount = new AtomicLong(0);
writeCount = new AtomicLong(0);
readDone = false;
this.promise = new CompletableFuture<>();
this.lh = lh;
this.clientCtx = clientCtx;
}
/**
* Set an entry listener to listen on individual recovery reads during recovery procedure.
*
* @param entryListener
* entry listener
* @return ledger recovery operation
*/
@VisibleForTesting
LedgerRecoveryOp setEntryListener(ReadEntryListener entryListener) {
this.entryListener = entryListener;
return this;
}
public CompletableFuture<LedgerHandle> initiate() {
ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(clientCtx.getBookieClient(),
lh.distributionSchedule,
lh.macManager,
lh.ledgerId,
lh.getCurrentEnsemble(),
lh.ledgerKey,
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
if (rc == BKException.Code.OK) {
synchronized (lh) {
lh.lastAddPushed = lh.lastAddConfirmed = data.getLastAddConfirmed();
lh.length = data.getLength();
lh.pendingAddsSequenceHead = lh.lastAddConfirmed;
startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
}
// keep a copy of ledger metadata before proceeding
// ledger recovery
metadataForRecovery = lh.getLedgerMetadata();
doRecoveryRead();
} else if (rc == BKException.Code.UnauthorizedAccessException) {
submitCallback(rc);
} else {
submitCallback(BKException.Code.ReadException);
}
}
});
/**
* Enable fencing on this op. When the read request reaches the bookies
* server it will fence off the ledger, stopping any subsequent operation
* from writing to it.
*/
rlcop.initiateWithFencing();
return promise;
}
private void submitCallback(int rc) {
if (BKException.Code.OK == rc) {
clientCtx.getClientStats().getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
clientCtx.getClientStats().getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
promise.complete(lh);
} else {
clientCtx.getClientStats().getRecoverAddCountLogger().registerFailedValue(writeCount.get());
clientCtx.getClientStats().getRecoverReadCountLogger().registerFailedValue(readCount.get());
promise.completeExceptionally(BKException.create(rc));
}
}
/**
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
if (!promise.isDone()) {
startEntryToRead = endEntryToRead + 1;
endEntryToRead = endEntryToRead + clientCtx.getConf().recoveryReadBatchSize;
new RecoveryReadOp(lh, clientCtx, startEntryToRead, endEntryToRead, this, null)
.initiate();
}
}
@Override
public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
// notify entry listener on individual entries being read during ledger recovery.
ReadEntryListener listener = entryListener;
if (null != listener) {
listener.onEntryComplete(rc, lh, entry, ctx);
}
// we only trigger recovery add an entry when readDone == false && callbackDone == false
if (!promise.isDone() && !readDone && rc == BKException.Code.OK) {
readCount.incrementAndGet();
byte[] data = entry.getEntry();
/*
* We will add this entry again to make sure it is written to enough
* replicas. We subtract the length of the data itself, since it will
* be added again when processing the call to add it.
*/
synchronized (lh) {
lh.length = entry.getLength() - (long) data.length;
// check whether entry id is expected, so we won't overwritten any entries by mistake
if (entry.getEntryId() != lh.lastAddPushed + 1) {
LOG.error("Unexpected to recovery add entry {} as entry {} for ledger {}.",
entry.getEntryId(), (lh.lastAddPushed + 1), lh.getId());
rc = BKException.Code.UnexpectedConditionException;
}
}
if (BKException.Code.OK == rc) {
lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
if (entry.getEntryId() == endEntryToRead) {
// trigger next batch read
doRecoveryRead();
}
return;
}
}
// no entry found. stop recovery procedure but wait until recovery add finished.
if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
readDone = true;
if (readCount.get() == writeCount.get()) {
submitCallback(BKException.Code.OK);
}
return;
}
// otherwise, some other error, we can't handle
if (BKException.Code.OK != rc && !promise.isDone()) {
LOG.error("Failure {} while reading entries: ({} - {}), ledger: {} while recovering ledger",
BKException.getMessage(rc), startEntryToRead, endEntryToRead, lh.getId());
submitCallback(rc);
} else if (BKException.Code.OK == rc) {
// we are here is because we successfully read an entry but readDone was already set to true.
// this would happen on recovery a ledger than has gaps in the tail.
LOG.warn("Successfully read entry {} for ledger {}, but readDone is already {}",
entry.getEntryId(), lh.getId(), readDone);
}
return;
}
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("Failure {} while writing entry: {} while recovering ledger: {}",
BKException.codeLogger(rc), entryId + 1, lh.ledgerId);
submitCallback(rc);
return;
}
long numAdd = writeCount.incrementAndGet();
if (readDone && readCount.get() == numAdd) {
submitCallback(rc);
}
}
}