blob: fe6370b9926ccd8dc9ccbea5e80f0f1ed448e01b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A utility class to check the complete ledger and finds the UnderReplicated fragments if any.
*
* <p>NOTE: This class is tended to be used by this project only. External users should not rely on it directly.
*/
public class LedgerChecker {
private static final Logger LOG = LoggerFactory.getLogger(LedgerChecker.class);
public final BookieClient bookieClient;
static class InvalidFragmentException extends Exception {
private static final long serialVersionUID = 1467201276417062353L;
}
/**
* This will collect all the entry read call backs and finally it will give
* call back to previous call back API which is waiting for it once it meets
* the expected call backs from down.
*/
private static class ReadManyEntriesCallback implements ReadEntryCallback {
AtomicBoolean completed = new AtomicBoolean(false);
final AtomicLong numEntries;
final LedgerFragment fragment;
final GenericCallback<LedgerFragment> cb;
ReadManyEntriesCallback(long numEntries, LedgerFragment fragment,
GenericCallback<LedgerFragment> cb) {
this.numEntries = new AtomicLong(numEntries);
this.fragment = fragment;
this.cb = cb;
}
public void readEntryComplete(int rc, long ledgerId, long entryId,
ByteBuf buffer, Object ctx) {
if (rc == BKException.Code.OK) {
if (numEntries.decrementAndGet() == 0
&& !completed.getAndSet(true)) {
cb.operationComplete(rc, fragment);
}
} else if (!completed.getAndSet(true)) {
cb.operationComplete(rc, fragment);
}
}
}
/**
* This will collect the bad bookies inside a ledger fragment.
*/
private static class LedgerFragmentCallback implements GenericCallback<LedgerFragment> {
private final LedgerFragment fragment;
private final int bookieIndex;
// bookie index -> return code
private final Map<Integer, Integer> badBookies;
private final AtomicInteger numBookies;
private final GenericCallback<LedgerFragment> cb;
LedgerFragmentCallback(LedgerFragment lf,
int bookieIndex,
GenericCallback<LedgerFragment> cb,
Map<Integer, Integer> badBookies,
AtomicInteger numBookies) {
this.fragment = lf;
this.bookieIndex = bookieIndex;
this.cb = cb;
this.badBookies = badBookies;
this.numBookies = numBookies;
}
@Override
public void operationComplete(int rc, LedgerFragment lf) {
if (BKException.Code.OK != rc) {
synchronized (badBookies) {
badBookies.put(bookieIndex, rc);
}
}
if (numBookies.decrementAndGet() == 0) {
if (badBookies.isEmpty()) {
cb.operationComplete(BKException.Code.OK, fragment);
} else {
int rcToReturn = BKException.Code.NoBookieAvailableException;
for (Map.Entry<Integer, Integer> entry : badBookies.entrySet()) {
rcToReturn = entry.getValue();
if (entry.getValue() == BKException.Code.ClientClosedException) {
break;
}
}
cb.operationComplete(rcToReturn,
fragment.subset(badBookies.keySet()));
}
}
}
}
public LedgerChecker(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}
/**
* Verify a ledger fragment to collect bad bookies.
*
* @param fragment
* fragment to verify
* @param cb
* callback
* @throws InvalidFragmentException
*/
private void verifyLedgerFragment(LedgerFragment fragment,
GenericCallback<LedgerFragment> cb,
Long percentageOfLedgerFragmentToBeVerified)
throws InvalidFragmentException, BKException {
Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
if (bookiesToCheck.isEmpty()) {
cb.operationComplete(BKException.Code.OK, fragment);
return;
}
AtomicInteger numBookies = new AtomicInteger(bookiesToCheck.size());
Map<Integer, Integer> badBookies = new HashMap<Integer, Integer>();
for (Integer bookieIndex : bookiesToCheck) {
LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
fragment, bookieIndex, cb, badBookies, numBookies);
verifyLedgerFragment(fragment, bookieIndex, lfCb, percentageOfLedgerFragmentToBeVerified);
}
}
/**
* Verify a bookie inside a ledger fragment.
*
* @param fragment
* ledger fragment
* @param bookieIndex
* bookie index in the fragment
* @param cb
* callback
* @throws InvalidFragmentException
*/
private void verifyLedgerFragment(LedgerFragment fragment,
int bookieIndex,
GenericCallback<LedgerFragment> cb,
long percentageOfLedgerFragmentToBeVerified)
throws InvalidFragmentException {
long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
long lastStored = fragment.getLastStoredEntryId(bookieIndex);
BookieSocketAddress bookie = fragment.getAddress(bookieIndex);
if (null == bookie) {
throw new InvalidFragmentException();
}
if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
// this fragment is not on this bookie
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
}
cb.operationComplete(BKException.Code.OK, fragment);
} else if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored,
manycb, null, BookieProtocol.FLAG_NONE);
} else {
if (lastStored <= firstStored) {
cb.operationComplete(Code.IncorrectParameterException, null);
return;
}
long lengthOfLedgerFragment = lastStored - firstStored + 1;
int numberOfEntriesToBeVerified =
(int) (lengthOfLedgerFragment * (percentageOfLedgerFragmentToBeVerified / 100.0));
TreeSet<Long> entriesToBeVerified = new TreeSet<Long>();
if (numberOfEntriesToBeVerified < lengthOfLedgerFragment) {
// Evenly pick random entries over the length of the fragment
if (numberOfEntriesToBeVerified > 0) {
int lengthOfBucket = (int) (lengthOfLedgerFragment / numberOfEntriesToBeVerified);
for (long index = firstStored;
index < (lastStored - lengthOfBucket - 1);
index += lengthOfBucket) {
long potentialEntryId = ThreadLocalRandom.current().nextInt((lengthOfBucket)) + index;
if (fragment.isStoredEntryId(potentialEntryId, bookieIndex)) {
entriesToBeVerified.add(potentialEntryId);
}
}
}
entriesToBeVerified.add(firstStored);
entriesToBeVerified.add(lastStored);
} else {
// Verify the entire fragment
while (firstStored <= lastStored) {
if (fragment.isStoredEntryId(firstStored, bookieIndex)) {
entriesToBeVerified.add(firstStored);
}
firstStored++;
}
}
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(entriesToBeVerified.size(),
fragment, cb);
for (Long entryID: entriesToBeVerified) {
bookieClient.readEntry(bookie, fragment.getLedgerId(), entryID, manycb, null, BookieProtocol.FLAG_NONE);
}
}
}
/**
* Callback for checking whether an entry exists or not.
* It is used to differentiate the cases where it has been written
* but now cannot be read, and where it never has been written.
*/
private static class EntryExistsCallback implements ReadEntryCallback {
AtomicBoolean entryMayExist = new AtomicBoolean(false);
final AtomicInteger numReads;
final GenericCallback<Boolean> cb;
EntryExistsCallback(int numReads,
GenericCallback<Boolean> cb) {
this.numReads = new AtomicInteger(numReads);
this.cb = cb;
}
public void readEntryComplete(int rc, long ledgerId, long entryId,
ByteBuf buffer, Object ctx) {
if (BKException.Code.NoSuchEntryException != rc && BKException.Code.NoSuchLedgerExistsException != rc
&& BKException.Code.NoSuchLedgerExistsOnMetadataServerException != rc) {
entryMayExist.set(true);
}
if (numReads.decrementAndGet() == 0) {
cb.operationComplete(rc, entryMayExist.get());
}
}
}
/**
* This will collect all the fragment read call backs and finally it will
* give call back to above call back API which is waiting for it once it
* meets the expected call backs from down.
*/
private static class FullLedgerCallback implements
GenericCallback<LedgerFragment> {
final Set<LedgerFragment> badFragments;
final AtomicLong numFragments;
final GenericCallback<Set<LedgerFragment>> cb;
FullLedgerCallback(long numFragments,
GenericCallback<Set<LedgerFragment>> cb) {
badFragments = new HashSet<LedgerFragment>();
this.numFragments = new AtomicLong(numFragments);
this.cb = cb;
}
public void operationComplete(int rc, LedgerFragment result) {
if (rc == BKException.Code.ClientClosedException) {
cb.operationComplete(BKException.Code.ClientClosedException, badFragments);
return;
} else if (rc != BKException.Code.OK) {
badFragments.add(result);
}
if (numFragments.decrementAndGet() == 0) {
cb.operationComplete(BKException.Code.OK, badFragments);
}
}
}
/**
* Check that all the fragments in the passed in ledger, and report those
* which are missing.
*/
public void checkLedger(final LedgerHandle lh,
final GenericCallback<Set<LedgerFragment>> cb) {
checkLedger(lh, cb, 0L);
}
public void checkLedger(final LedgerHandle lh,
final GenericCallback<Set<LedgerFragment>> cb,
long percentageOfLedgerFragmentToBeVerified) {
// build a set of all fragment replicas
final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
Long curEntryId = null;
List<BookieSocketAddress> curEnsemble = null;
for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : lh
.getLedgerMetadata().getAllEnsembles().entrySet()) {
if (curEntryId != null) {
Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
bookieIndexes.add(i);
}
fragments.add(new LedgerFragment(lh, curEntryId,
e.getKey() - 1, bookieIndexes));
}
curEntryId = e.getKey();
curEnsemble = e.getValue();
}
/* Checking the last segment of the ledger can be complicated in some cases.
* In the case that the ledger is closed, we can just check the fragments of
* the segment as normal even if no data has ever been written to.
* In the case that the ledger is open, but enough entries have been written,
* for lastAddConfirmed to be set above the start entry of the segment, we
* can also check as normal.
* However, if ledger is open, sometimes lastAddConfirmed cannot be trusted,
* such as when it's lower than the first entry id, or not set at all,
* we cannot be sure if there has been data written to the segment.
* For this reason, we have to send a read request
* to the bookies which should have the first entry. If they respond with
* NoSuchEntry we can assume it was never written. If they respond with anything
* else, we must assume the entry has been written, so we run the check.
*/
if (curEntryId != null) {
long lastEntry = lh.getLastAddConfirmed();
if (!lh.isClosed() && lastEntry < curEntryId) {
lastEntry = curEntryId;
}
Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
bookieIndexes.add(i);
}
final LedgerFragment lastLedgerFragment = new LedgerFragment(lh, curEntryId,
lastEntry, bookieIndexes);
// Check for the case that no last confirmed entry has been set
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;
final EntryExistsCallback eecb = new EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
new GenericCallback<Boolean>() {
public void operationComplete(int rc, Boolean result) {
if (result) {
fragments.add(lastLedgerFragment);
}
checkFragments(fragments, cb,
percentageOfLedgerFragmentToBeVerified);
}
});
DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(entryToRead);
for (int i = 0; i < writeSet.size(); i++) {
BookieSocketAddress addr = curEnsemble.get(writeSet.get(i));
bookieClient.readEntry(addr, lh.getId(), entryToRead,
eecb, null, BookieProtocol.FLAG_NONE);
}
writeSet.recycle();
return;
} else {
fragments.add(lastLedgerFragment);
}
}
checkFragments(fragments, cb, percentageOfLedgerFragmentToBeVerified);
}
private void checkFragments(Set<LedgerFragment> fragments,
GenericCallback<Set<LedgerFragment>> cb,
long percentageOfLedgerFragmentToBeVerified) {
if (fragments.size() == 0) { // no fragments to verify
cb.operationComplete(BKException.Code.OK, fragments);
return;
}
// verify all the collected fragment replicas
FullLedgerCallback allFragmentsCb = new FullLedgerCallback(fragments
.size(), cb);
for (LedgerFragment r : fragments) {
LOG.debug("Checking fragment {}", r);
try {
verifyLedgerFragment(r, allFragmentsCb, percentageOfLedgerFragmentToBeVerified);
} catch (InvalidFragmentException ife) {
LOG.error("Invalid fragment found : {}", r);
allFragmentsCb.operationComplete(
BKException.Code.IncorrectParameterException, r);
} catch (BKException e) {
LOG.error("BKException when checking fragment : {}", r, e);
}
}
}
}