blob: e548f3d456a2a1d9d0ace577a870c8a99bb65912 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Sequence of entries of a ledger that represents a pending read operation.
* When all the data read has come back, the application callback is called.
* This class could be improved because we could start pushing data to the
* application as soon as it arrives rather than waiting for the whole thing.
class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
private final static Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
final int speculativeReadTimeout;
final private ScheduledExecutorService scheduler;
private ScheduledFuture<?> speculativeTask = null;
Queue<LedgerEntryRequest> seq;
Set<BookieSocketAddress> heardFromHosts;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
long numPendingEntries;
long startEntryId;
long endEntryId;
long requestTimeMillis;
OpStatsLogger readOpLogger;
final int maxMissedReadsAllowed;
class LedgerEntryRequest extends LedgerEntry {
final static int NOT_FOUND = -1;
int nextReplicaIndexToReadFrom = 0;
AtomicBoolean complete = new AtomicBoolean(false);
int firstError = BKException.Code.OK;
int numMissedEntryReads = 0;
final ArrayList<BookieSocketAddress> ensemble;
final List<Integer> writeSet;
final BitSet sentReplicas;
final BitSet erroredReplicas;
LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
private int getReplicaIndex(BookieSocketAddress host) {
int bookieIndex = ensemble.indexOf(host);
if (bookieIndex == -1) {
return NOT_FOUND;
return writeSet.indexOf(bookieIndex);
private BitSet getSentToBitSet() {
BitSet b = new BitSet(ensemble.size());
for (int i = 0; i < sentReplicas.length(); i++) {
if (sentReplicas.get(i)) {
return b;
private BitSet getHeardFromBitSet(Set<BookieSocketAddress> heardFromHosts) {
BitSet b = new BitSet(ensemble.size());
for (BookieSocketAddress i : heardFromHosts) {
int index = ensemble.indexOf(i);
if (index != -1) {
return b;
private boolean readsOutstanding() {
return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
* Send to next replica speculatively, if required and possible.
* This returns the host we may have sent to for unit testing.
* @return host we sent to if we sent. null otherwise.
synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
return null;
BitSet sentTo = getSentToBitSet();
BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
// only send another read, if we have had no response at all (even for other entries)
// from any of the other bookies we have sent the request to
if (sentTo.cardinality() == 0) {
return sendNextRead();
} else {
return null;
synchronized BookieSocketAddress sendNextRead() {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
// Do it a bit pessimistically, only when finished trying all replicas
// to check whether we received more missed reads than maxMissedReadsAllowed
if (BKException.Code.BookieHandleNotAvailableException == firstError &&
numMissedEntryReads > maxMissedReadsAllowed) {
firstError = BKException.Code.NoSuchEntryException;
return null;
int replica = nextReplicaIndexToReadFrom;
int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
try {
BookieSocketAddress to = ensemble.get(bookieIndex);
sendReadTo(to, this);
return to;
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry " + this, ie);
return null;
synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
if (BKException.Code.OK == firstError ||
BKException.Code.NoSuchEntryException == firstError ||
BKException.Code.NoSuchLedgerExistsException == firstError) {
firstError = rc;
} else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
BKException.Code.NoSuchEntryException != rc &&
BKException.Code.NoSuchLedgerExistsException != rc) {
// if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
// returned we need to update firstError to indicate that it might be a valid read but just
// failed.
firstError = rc;
if (BKException.Code.NoSuchEntryException == rc ||
BKException.Code.NoSuchLedgerExistsException == rc) {
LOG.debug("No such entry found on bookie. L{} E{} bookie: {}",
new Object[] { lh.ledgerId, entryId, host });
} else {
LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
new Object[] { lh.ledgerId, entryId, host });
int replica = getReplicaIndex(host);
if (replica == NOT_FOUND) {
LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
if (!readsOutstanding()) {
// return true if we managed to complete the entry
// return false if the read entry is not complete or it is already completed before
boolean complete(BookieSocketAddress host, final ChannelBuffer buffer) {
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKDigestMatchException e) {
logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
if (!complete.getAndSet(true)) {
entryDataStream = is;
* The length is a long and it is the last field of the metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
return true;
} else {
return false;
boolean isComplete() {
return complete.get();
public String toString() {
return String.format("L%d-E%d", ledgerId, entryId);
PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
this.cb = cb;
this.ctx = ctx;
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
this.scheduler = scheduler;
numPendingEntries = endEntryId - startEntryId + 1;
maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
- getLedgerMetadata().getAckQuorumSize();
speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
heardFromHosts = new HashSet<BookieSocketAddress>();
readOpLogger = lh.bk.getReadOpLogger();
protected LedgerMetadata getLedgerMetadata() {
return lh.metadata;
private void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
if (speculativeTask != null) {
speculativeTask = null;
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
this.requestTimeMillis =;
ArrayList<BookieSocketAddress> ensemble = null;
if (speculativeReadTimeout > 0) {
Runnable readTask = new Runnable() {
public void run() {
int x = 0;
for (LedgerEntryRequest r : seq) {
if (!r.isComplete()) {
if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
// Subsequent speculative read will not materialize anyway
} else {
LOG.debug("Send speculative read for {}. Hosts heard are {}.",
r, heardFromHosts);
if (x > 0) {
LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
try {
speculativeTask = scheduler.scheduleWithFixedDelay(readTask,
speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException re) {
LOG.debug("Failed to schedule speculative reads for ledger {} ({}, {}) : ",
new Object[] { lh.getId(), startEntryId, endEntryId, re });
do {
if (i == nextEnsembleChange) {
ensemble = getLedgerMetadata().getEnsemble(i);
nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i);
LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, lh.ledgerId, i);
} while (i <= endEntryId);
private static class ReadContext {
final BookieSocketAddress to;
final LedgerEntryRequest entry;
ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) { = to;
this.entry = entry;
void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
this, new ReadContext(to, entry));
public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
final ReadContext rctx = (ReadContext)ctx;
final LedgerEntryRequest entry = rctx.entry;
if (rc != BKException.Code.OK) {
entry.logErrorAndReattemptRead(, "Error: " + BKException.getMessage(rc), rc);
if (entry.complete(, buffer)) {
if (numPendingEntries == 0) {
if(numPendingEntries < 0)
LOG.error("Read too many values");
private void submitCallback(int code) {
long latencyMillis = - requestTimeMillis;
if (code != BKException.Code.OK) {
long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
for (LedgerEntryRequest req : seq) {
if (!req.isComplete()) {
firstUnread = req.getEntryId();
LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}",
new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, firstUnread });
} else {
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
public boolean hasMoreElements() {
return !seq.isEmpty();
public LedgerEntry nextElement() throws NoSuchElementException {
return seq.remove();
public int size() {
return seq.size();