blob: ed1bece65f27d70d465e869d2df8909d43385aee [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Long poll read operation.
*/
class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEntryCallback,
SpeculativeRequestExecutor {
static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class);
ReadLACAndEntryRequest request;
final BitSet heardFromHostsBitSet;
final BitSet emptyResponsesFromHostsBitSet;
final int maxMissedReadsAllowed;
boolean parallelRead = false;
final AtomicBoolean requestComplete = new AtomicBoolean(false);
final long requestTimeNano;
private final LedgerHandle lh;
private final ClientContext clientCtx;
private final LastConfirmedAndEntryCallback cb;
private int numResponsesPending;
private final int numEmptyResponsesAllowed;
private volatile boolean hasValidResponse = false;
private final long prevEntryId;
private long lastAddConfirmed;
private long timeOutInMillis;
private final List<BookieSocketAddress> currentEnsemble;
private ScheduledFuture<?> speculativeTask = null;
abstract class ReadLACAndEntryRequest implements AutoCloseable {
final AtomicBoolean complete = new AtomicBoolean(false);
int rc = BKException.Code.OK;
int firstError = BKException.Code.OK;
int numMissedEntryReads = 0;
final List<BookieSocketAddress> ensemble;
final DistributionSchedule.WriteSet writeSet;
final DistributionSchedule.WriteSet orderedEnsemble;
final LedgerEntryImpl entryImpl;
ReadLACAndEntryRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
if (clientCtx.getConf().enableReorderReadSequence) {
this.orderedEnsemble = clientCtx.getPlacementPolicy().reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
} else {
this.orderedEnsemble = writeSet.copy();
}
}
public void close() {
entryImpl.close();
}
synchronized int getFirstError() {
return firstError;
}
/**
* Execute the read request.
*/
abstract void read();
/**
* Complete the read request from <i>host</i>.
*
* @param bookieIndex
* bookie index
* @param host
* host that respond the read
* @param buffer
* the data buffer
* @return return true if we managed to complete the entry;
* otherwise return false if the read entry is not complete or it is already completed before
*/
boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer, long entryId) {
ByteBuf content;
try {
content = lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
} catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
}
if (!complete.getAndSet(true)) {
writeSet.recycle();
orderedEnsemble.recycle();
rc = BKException.Code.OK;
/*
* The length is a long and it is the last field of the metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
*/
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
entryImpl.setEntryBuf(content);
return true;
} else {
return false;
}
}
/**
* Fail the request with given result code <i>rc</i>.
*
* @param rc
* result code to fail the request.
* @return true if we managed to fail the entry; otherwise return false if it already failed or completed.
*/
boolean fail(int rc) {
if (complete.compareAndSet(false, true)) {
writeSet.recycle();
orderedEnsemble.recycle();
this.rc = rc;
translateAndSetFirstError(rc);
completeRequest();
return true;
} else {
return false;
}
}
private synchronized void translateAndSetFirstError(int rc) {
if (BKException.Code.OK == firstError
|| BKException.Code.NoSuchEntryException == firstError
|| BKException.Code.NoSuchLedgerExistsException == firstError) {
firstError = rc;
} else if (BKException.Code.BookieHandleNotAvailableException == firstError
&& BKException.Code.NoSuchEntryException != rc
&& BKException.Code.NoSuchLedgerExistsException != rc) {
// if other exception rather than NoSuchEntryException is returned
// we need to update firstError to indicate that it might be a valid read but just failed.
firstError = rc;
}
}
/**
* Log error <i>errMsg</i> and reattempt read from <i>host</i>.
*
* @param bookieIndex
* bookie index
* @param host
* host that just respond
* @param errMsg
* error msg to log
* @param rc
* read result code
*/
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
translateAndSetFirstError(rc);
if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) {
// Since we send all long poll requests to every available node, we should only
// treat these errors as failures if the node from which we received this is part of
// the writeSet
if (this.writeSet.contains(bookieIndex)) {
lh.registerOperationFailureOnBookie(host, entryImpl.getEntryId());
}
++numMissedEntryReads;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} while reading entry: {} ledgerId: {} from bookie: {}", errMsg, entryImpl.getEntryId(),
lh.getId(), host);
}
}
/**
* Send to next replica speculatively, if required and possible.
* This returns the host we may have sent to for unit testing.
*
* @param heardFromHostsBitSet
* the set of hosts that we already received responses.
* @return host we sent to if we sent. null otherwise.
*/
abstract BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet);
/**
* Whether the read request completed.
*
* @return true if the read request is completed.
*/
boolean isComplete() {
return complete.get();
}
/**
* Get result code of this entry.
*
* @return result code.
*/
int getRc() {
return rc;
}
@Override
public String toString() {
return String.format("L%d-E%d", entryImpl.getLedgerId(), entryImpl.getEntryId());
}
}
class ParallelReadRequest extends ReadLACAndEntryRequest {
int numPendings;
ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
super(ensemble, lId, eId);
numPendings = orderedEnsemble.size();
}
@Override
void read() {
for (int i = 0; i < orderedEnsemble.size(); i++) {
BookieSocketAddress to = ensemble.get(orderedEnsemble.get(i));
try {
sendReadTo(orderedEnsemble.get(i), to, this);
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry {} : ", this, ie);
Thread.currentThread().interrupt();
fail(BKException.Code.InterruptedException);
return;
}
}
}
@Override
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
--numPendings;
// if received all responses or this entry doesn't meet quorum write, complete the request.
if (numMissedEntryReads > maxMissedReadsAllowed || numPendings == 0) {
if (BKException.Code.BookieHandleNotAvailableException == firstError
&& numMissedEntryReads > maxMissedReadsAllowed) {
firstError = BKException.Code.NoSuchEntryException;
}
fail(firstError);
}
}
@Override
BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
// no speculative read
return null;
}
}
class SequenceReadRequest extends ReadLACAndEntryRequest {
static final int NOT_FOUND = -1;
int nextReplicaIndexToReadFrom = 0;
final BitSet sentReplicas;
final BitSet erroredReplicas;
final BitSet emptyResponseReplicas;
SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
super(ensemble, lId, eId);
this.sentReplicas = new BitSet(orderedEnsemble.size());
this.erroredReplicas = new BitSet(orderedEnsemble.size());
this.emptyResponseReplicas = new BitSet(orderedEnsemble.size());
}
private synchronized int getNextReplicaIndexToReadFrom() {
return nextReplicaIndexToReadFrom;
}
private int getReplicaIndex(int bookieIndex) {
return orderedEnsemble.indexOf(bookieIndex);
}
private BitSet getSentToBitSet() {
BitSet b = new BitSet(ensemble.size());
for (int i = 0; i < sentReplicas.length(); i++) {
if (sentReplicas.get(i)) {
b.set(orderedEnsemble.get(i));
}
}
return b;
}
private boolean readsOutstanding() {
return (sentReplicas.cardinality() - erroredReplicas.cardinality()
- emptyResponseReplicas.cardinality()) > 0;
}
/**
* Send to next replica speculatively, if required and possible.
* This returns the host we may have sent to for unit testing.
* @return host we sent to if we sent. null otherwise.
*/
@Override
synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getEnsembleSize()) {
return null;
}
BitSet sentTo = getSentToBitSet();
sentTo.and(heardFrom);
// only send another read, if we have had no response at all (even for other entries)
// from any of the other bookies we have sent the request to
if (sentTo.cardinality() == 0) {
return sendNextRead();
} else {
return null;
}
}
@Override
void read() {
sendNextRead();
}
synchronized BookieSocketAddress sendNextRead() {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getEnsembleSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
// Do it a bit pessimistically, only when finished trying all replicas
// to check whether we received more missed reads than requiredBookiesMissingEntryForRecovery
if (BKException.Code.BookieHandleNotAvailableException == firstError
&& numMissedEntryReads > maxMissedReadsAllowed) {
firstError = BKException.Code.NoSuchEntryException;
}
fail(firstError);
return null;
}
int replica = nextReplicaIndexToReadFrom;
int bookieIndex = orderedEnsemble.get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
BookieSocketAddress to = ensemble.get(bookieIndex);
sendReadTo(bookieIndex, to, this);
sentReplicas.set(replica);
return to;
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry " + this, ie);
Thread.currentThread().interrupt();
fail(BKException.Code.InterruptedException);
return null;
}
}
@Override
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
int replica = getReplicaIndex(bookieIndex);
if (replica == NOT_FOUND) {
LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
return;
}
if (BKException.Code.OK == rc) {
emptyResponseReplicas.set(replica);
} else {
erroredReplicas.set(replica);
}
if (!readsOutstanding()) {
sendNextRead();
}
}
@Override
boolean complete(int bookieIndex, BookieSocketAddress host, ByteBuf buffer, long entryId) {
boolean completed = super.complete(bookieIndex, host, buffer, entryId);
if (completed) {
int numReplicasTried = getNextReplicaIndexToReadFrom();
// Check if any speculative reads were issued and mark any bookies before the
// first speculative read as slow
for (int i = 0; i < numReplicasTried; i++) {
int slowBookieIndex = orderedEnsemble.get(i);
BookieSocketAddress slowBookieSocketAddress = ensemble.get(slowBookieIndex);
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
}
}
return completed;
}
}
ReadLastConfirmedAndEntryOp(LedgerHandle lh,
ClientContext clientCtx,
List<BookieSocketAddress> ensemble,
LastConfirmedAndEntryCallback cb,
long prevEntryId,
long timeOutInMillis) {
this.lh = lh;
this.clientCtx = clientCtx;
this.cb = cb;
this.prevEntryId = prevEntryId;
this.lastAddConfirmed = lh.getLastAddConfirmed();
this.timeOutInMillis = timeOutInMillis;
this.numResponsesPending = 0;
this.currentEnsemble = ensemble;
// since long poll is effectively reading lac with waits, lac can be potentially
// be advanced in different write quorums, so we need to make sure to cover enough
// bookies before claiming lac is not advanced.
this.numEmptyResponsesAllowed = getLedgerMetadata().getEnsembleSize()
- getLedgerMetadata().getAckQuorumSize() + 1;
this.requestTimeNano = MathUtils.nowInNano();
maxMissedReadsAllowed = getLedgerMetadata().getEnsembleSize()
- getLedgerMetadata().getAckQuorumSize();
heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
emptyResponsesFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
}
protected LedgerMetadata getLedgerMetadata() {
return lh.getLedgerMetadata();
}
ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
this.parallelRead = enabled;
return this;
}
protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
if (speculativeTask != null) {
speculativeTask.cancel(mayInterruptIfRunning);
speculativeTask = null;
}
}
/**
* Speculative Read Logic.
*/
@Override
public ListenableFuture<Boolean> issueSpeculativeRequest() {
return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
if (!requestComplete.get() && !request.isComplete()
&& (null != request.maybeSendSpeculativeRead(heardFromHostsBitSet))) {
if (LOG.isDebugEnabled()) {
LOG.debug("Send speculative ReadLAC {} for ledger {} (previousLAC: {}). Hosts heard are {}.",
request, lh.getId(), lastAddConfirmed, heardFromHostsBitSet);
}
return true;
}
return false;
}
});
}
public void initiate() {
if (parallelRead) {
request = new ParallelReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
} else {
request = new SequenceReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
}
request.read();
if (!parallelRead && clientCtx.getConf().readLACSpeculativeRequestPolicy.isPresent()) {
speculativeTask = clientCtx.getConf().readLACSpeculativeRequestPolicy.get()
.initiateSpeculativeRequest(clientCtx.getScheduler(), this);
}
}
void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest entry) throws InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}",
prevEntryId, timeOutInMillis, to, parallelRead);
}
clientCtx.getBookieClient().readEntryWaitForLACUpdate(to,
lh.getId(),
BookieProtocol.LAST_ADD_CONFIRMED,
prevEntryId,
timeOutInMillis,
true,
this, new ReadLastConfirmedAndEntryContext(bookieIndex, to));
this.numResponsesPending++;
}
/**
* Wrapper to get all recovered data from the request.
*/
interface LastConfirmedAndEntryCallback {
void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry);
}
private void submitCallback(int rc) {
long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
LedgerEntry entry;
cancelSpeculativeTask(true);
if (BKException.Code.OK != rc) {
clientCtx.getClientStats().getReadLacAndEntryOpLogger()
.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
entry = null;
} else {
// could received advanced lac, with no entry
clientCtx.getClientStats().getReadLacAndEntryOpLogger()
.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
if (request.entryImpl.getEntryBuffer() != null) {
entry = new LedgerEntry(request.entryImpl);
} else {
entry = null;
}
}
request.close();
cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
}
@Override
public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} received response for (lid={}, eid={}) : {}",
getClass().getName(), ledgerId, entryId, rc);
}
ReadLastConfirmedAndEntryContext rCtx = (ReadLastConfirmedAndEntryContext) ctx;
BookieSocketAddress bookie = rCtx.getBookieAddress();
numResponsesPending--;
if (BKException.Code.OK == rc) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received lastAddConfirmed (lac={}) from bookie({}) for (lid={}).",
rCtx.getLastAddConfirmed(), bookie, ledgerId);
}
if (rCtx.getLastAddConfirmed() > lastAddConfirmed) {
lastAddConfirmed = rCtx.getLastAddConfirmed();
lh.updateLastConfirmed(rCtx.getLastAddConfirmed(), 0L);
}
hasValidResponse = true;
if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
buffer.retain();
if (!requestComplete.get() && request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
// callback immediately
if (rCtx.getLacUpdateTimestamp().isPresent()) {
long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
- rCtx.getLacUpdateTimestamp().get());
elapsedMicros = Math.max(elapsedMicros, 0);
clientCtx.getClientStats().getReadLacAndEntryRespLogger()
.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
}
// if the request has already completed, the buffer is not going to be used anymore, release it.
if (!completeRequest()) {
buffer.release();
}
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
} else {
buffer.release();
}
} else {
emptyResponsesFromHostsBitSet.set(rCtx.getBookieIndex(), true);
if (lastAddConfirmed > prevEntryId) {
// received advanced lac
completeRequest();
} else if (emptyResponsesFromHostsBitSet.cardinality() >= numEmptyResponsesAllowed) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completed readLACAndEntry(lid = {}, previousEntryId = {}) "
+ "after received {} empty responses ('{}').",
ledgerId, prevEntryId, emptyResponsesFromHostsBitSet.cardinality(),
emptyResponsesFromHostsBitSet);
}
completeRequest();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Received empty response for readLACAndEntry(lid = {}, previousEntryId = {}) from"
+ " bookie {} @ {}, reattempting reading next bookie : lac = {}",
ledgerId, prevEntryId, rCtx.getBookieAddress(),
rCtx.getBookieAddress(), lastAddConfirmed);
}
request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, "Empty Response", rc);
}
return;
}
} else if (BKException.Code.UnauthorizedAccessException == rc && !requestComplete.get()) {
submitCallback(rc);
requestComplete.set(true);
} else {
request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, "Error: " + BKException.getMessage(rc), rc);
return;
}
if (numResponsesPending <= 0) {
completeRequest();
}
}
private boolean completeRequest() {
boolean requestCompleted = requestComplete.compareAndSet(false, true);
if (requestCompleted) {
if (!hasValidResponse) {
// no success called
submitCallback(request.getFirstError());
} else {
// callback
submitCallback(BKException.Code.OK);
}
}
return requestCompleted;
}
@Override
public String toString() {
return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.getId(), prevEntryId);
}
}