blob: 368eac1c279ee66cc6abe23caed76bcfd200fc32 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
/**
*Checks the complete ledger and finds the UnderReplicated fragments if any
*/
public class LedgerChecker {
private static Logger LOG = LoggerFactory.getLogger(LedgerChecker.class);
public final BookieClient bookieClient;
static class InvalidFragmentException extends Exception {
private static final long serialVersionUID = 1467201276417062353L;
}
/**
* This will collect all the entry read call backs and finally it will give
* call back to previous call back API which is waiting for it once it meets
* the expected call backs from down
*/
private static class ReadManyEntriesCallback implements ReadEntryCallback {
AtomicBoolean completed = new AtomicBoolean(false);
final AtomicLong numEntries;
final LedgerFragment fragment;
final GenericCallback<LedgerFragment> cb;
ReadManyEntriesCallback(long numEntries, LedgerFragment fragment,
GenericCallback<LedgerFragment> cb) {
this.numEntries = new AtomicLong(numEntries);
this.fragment = fragment;
this.cb = cb;
}
public void readEntryComplete(int rc, long ledgerId, long entryId,
ChannelBuffer buffer, Object ctx) {
if (rc == BKException.Code.OK) {
if (numEntries.decrementAndGet() == 0
&& !completed.getAndSet(true)) {
cb.operationComplete(rc, fragment);
}
} else if (!completed.getAndSet(true)) {
cb.operationComplete(rc, fragment);
}
}
}
public LedgerChecker(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}
private void verifyLedgerFragment(LedgerFragment fragment,
GenericCallback<LedgerFragment> cb) throws InvalidFragmentException {
long firstStored = fragment.getFirstStoredEntryId();
long lastStored = fragment.getLastStoredEntryId();
if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
}
cb.operationComplete(BKException.Code.OK, fragment);
return;
}
if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
bookieClient.readEntry(fragment.getAddress(), fragment
.getLedgerId(), firstStored, manycb, null);
} else {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
fragment, cb);
bookieClient.readEntry(fragment.getAddress(), fragment
.getLedgerId(), firstStored, manycb, null);
bookieClient.readEntry(fragment.getAddress(), fragment
.getLedgerId(), lastStored, manycb, null);
}
}
/**
* Callback for checking whether an entry exists or not.
* It is used to differentiate the cases where it has been written
* but now cannot be read, and where it never has been written.
*/
private static class EntryExistsCallback implements ReadEntryCallback {
AtomicBoolean entryMayExist = new AtomicBoolean(false);
final AtomicInteger numReads;
final GenericCallback<Boolean> cb;
EntryExistsCallback(int numReads,
GenericCallback<Boolean> cb) {
this.numReads = new AtomicInteger(numReads);
this.cb = cb;
}
public void readEntryComplete(int rc, long ledgerId, long entryId,
ChannelBuffer buffer, Object ctx) {
if (rc != BKException.Code.NoSuchEntryException) {
entryMayExist.set(true);
}
if (numReads.decrementAndGet() == 0) {
cb.operationComplete(rc, entryMayExist.get());
}
}
}
/**
* This will collect all the fragment read call backs and finally it will
* give call back to above call back API which is waiting for it once it
* meets the expected call backs from down
*/
private static class FullLedgerCallback implements
GenericCallback<LedgerFragment> {
final Set<LedgerFragment> badFragments;
final AtomicLong numFragments;
final GenericCallback<Set<LedgerFragment>> cb;
FullLedgerCallback(long numFragments,
GenericCallback<Set<LedgerFragment>> cb) {
badFragments = new HashSet<LedgerFragment>();
this.numFragments = new AtomicLong(numFragments);
this.cb = cb;
}
public void operationComplete(int rc, LedgerFragment result) {
if (rc != BKException.Code.OK) {
badFragments.add(result);
}
if (numFragments.decrementAndGet() == 0) {
cb.operationComplete(BKException.Code.OK, badFragments);
}
}
}
/**
* Check that all the fragments in the passed in ledger, and report those
* which are missing.
*/
public void checkLedger(LedgerHandle lh,
final GenericCallback<Set<LedgerFragment>> cb) {
// build a set of all fragment replicas
final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
Long curEntryId = null;
ArrayList<InetSocketAddress> curEnsemble = null;
for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : lh
.getLedgerMetadata().getEnsembles().entrySet()) {
if (curEntryId != null) {
for (int i = 0; i < curEnsemble.size(); i++) {
fragments.add(new LedgerFragment(lh, curEntryId,
e.getKey() - 1, i));
}
}
curEntryId = e.getKey();
curEnsemble = e.getValue();
}
/* Checking the last segment of the ledger can be complicated in some cases.
* In the case that the ledger is closed, we can just check the fragments of
* the segment as normal, except in the case that no entry was ever written,
* to the ledger, in which case we check no fragments.
* In the case that the ledger is open, but enough entries have been written,
* for lastAddConfirmed to be set above the start entry of the segment, we
* can also check as normal.
* However, if lastAddConfirmed cannot be trusted, such as when it's lower than
* the first entry id, or not set at all, we cannot be sure if there has been
* data written to the segment. For this reason, we have to send a read request
* to the bookies which should have the first entry. If they respond with
* NoSuchEntry we can assume it was never written. If they respond with anything
* else, we must assume the entry has been written, so we run the check.
*/
if (curEntryId != null && !(lh.getLedgerMetadata().isClosed() && lh.getLastAddConfirmed() < curEntryId)) {
long lastEntry = lh.getLastAddConfirmed();
if (lastEntry < curEntryId) {
lastEntry = curEntryId;
}
final Set<LedgerFragment> finalSegmentFragments = new HashSet<LedgerFragment>();
for (int i = 0; i < curEnsemble.size(); i++) {
finalSegmentFragments.add(new LedgerFragment(lh, curEntryId,
lastEntry, i));
}
// Check for the case that no last confirmed entry has
// been set.
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;
EntryExistsCallback eecb
= new EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
new GenericCallback<Boolean>() {
public void operationComplete(int rc, Boolean result) {
if (result) {
fragments.addAll(finalSegmentFragments);
}
checkFragments(fragments, cb);
}
});
for (int bi : lh.getDistributionSchedule().getWriteSet(entryToRead)) {
InetSocketAddress addr = curEnsemble.get(bi);
bookieClient.readEntry(addr, lh.getId(),
entryToRead, eecb, null);
}
return;
} else {
fragments.addAll(finalSegmentFragments);
}
}
checkFragments(fragments, cb);
}
private void checkFragments(Set<LedgerFragment> fragments,
GenericCallback<Set<LedgerFragment>> cb) {
if (fragments.size() == 0) { // no fragments to verify
cb.operationComplete(BKException.Code.OK, fragments);
return;
}
// verify all the collected fragment replicas
FullLedgerCallback allFragmentsCb = new FullLedgerCallback(fragments
.size(), cb);
for (LedgerFragment r : fragments) {
LOG.debug("Checking fragment {}", r);
try {
verifyLedgerFragment(r, allFragmentsCb);
} catch (InvalidFragmentException ife) {
LOG.error("Invalid fragment found : {}", r);
allFragmentsCb.operationComplete(
BKException.Code.IncorrectParameterException, r);
}
}
}
}