blob: e39bf60237f14b331f5b9b81449d9546a9029a4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.List;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class encapsulated the read last confirmed operation.
*
*/
class ReadLastConfirmedOp implements ReadEntryCallback {
static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedOp.class);
private final long ledgerId;
private final byte[] ledgerKey;
private final BookieClient bookieClient;
private final DigestManager digestManager;
private int numResponsesPending;
private RecoveryData maxRecoveredData;
private volatile boolean completed = false;
private int lastSeenError = BKException.Code.ReadException;
private final LastConfirmedDataCallback cb;
private final DistributionSchedule.QuorumCoverageSet coverageSet;
private final List<BookieSocketAddress> currentEnsemble;
/**
* Wrapper to get all recovered data from the request.
*/
interface LastConfirmedDataCallback {
void readLastConfirmedDataComplete(int rc, RecoveryData data);
}
public ReadLastConfirmedOp(BookieClient bookieClient,
DistributionSchedule schedule,
DigestManager digestManager,
long ledgerId,
List<BookieSocketAddress> ensemble,
byte[] ledgerKey,
LastConfirmedDataCallback cb) {
this.cb = cb;
this.bookieClient = bookieClient;
this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
this.numResponsesPending = ensemble.size();
this.coverageSet = schedule.getCoverageSet();
this.currentEnsemble = ensemble;
this.ledgerId = ledgerId;
this.ledgerKey = ledgerKey;
this.digestManager = digestManager;
}
public void initiate() {
for (int i = 0; i < currentEnsemble.size(); i++) {
bookieClient.readEntry(currentEnsemble.get(i),
ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_NONE);
}
}
public void initiateWithFencing() {
for (int i = 0; i < currentEnsemble.size(); i++) {
bookieClient.readEntry(currentEnsemble.get(i),
ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_DO_FENCING,
ledgerKey);
}
}
public synchronized void readEntryComplete(final int rc, final long ledgerId, final long entryId,
final ByteBuf buffer, final Object ctx) {
int bookieIndex = (Integer) ctx;
// add the response to coverage set
coverageSet.addBookie(bookieIndex, rc);
numResponsesPending--;
boolean heardValidResponse = false;
if (rc == BKException.Code.OK) {
try {
RecoveryData recoveryData = digestManager.verifyDigestAndReturnLastConfirmed(buffer);
if (recoveryData.getLastAddConfirmed() > maxRecoveredData.getLastAddConfirmed()) {
maxRecoveredData = recoveryData;
}
heardValidResponse = true;
} catch (BKDigestMatchException e) {
// Too bad, this bookie didn't give us a valid answer, we
// still might be able to recover though so continue
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
+ " while reading last entry from bookie: "
+ currentEnsemble.get(bookieIndex));
}
}
if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
// this still counts as a valid response, e.g., if the client crashed without writing any entry
heardValidResponse = true;
}
if (rc == BKException.Code.UnauthorizedAccessException && !completed) {
cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
completed = true;
}
if (!heardValidResponse && BKException.Code.OK != rc) {
lastSeenError = rc;
}
// other return codes dont count as valid responses
if (heardValidResponse
&& coverageSet.checkCovered()
&& !completed) {
completed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Read Complete with enough validResponses for ledger: {}, entry: {}",
ledgerId, entryId);
}
cb.readLastConfirmedDataComplete(BKException.Code.OK, maxRecoveredData);
return;
}
if (numResponsesPending == 0 && !completed) {
LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums, {}",
ledgerId, coverageSet);
cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
}
}
@VisibleForTesting
synchronized int getNumResponsesPending() {
return numResponsesPending;
}
}