blob: 2d785ab6a17cb1617b487213e5d497f53f46da93 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkState;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This represents a request to sync the ledger on every bookie.
*/
class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
private static final Logger LOG = LoggerFactory.getLogger(ForceLedgerOp.class);
final CompletableFuture<Void> cb;
DistributionSchedule.AckSet ackSet;
boolean completed = false;
boolean errored = false;
int lastSeenError = BKException.Code.WriteException;
final List<BookieSocketAddress> currentEnsemble;
long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
final LedgerHandle lh;
final BookieClient bookieClient;
ForceLedgerOp(LedgerHandle lh, BookieClient bookieClient,
List<BookieSocketAddress> ensemble,
CompletableFuture<Void> cb) {
this.lh = lh;
this.bookieClient = bookieClient;
this.cb = cb;
this.currentEnsemble = ensemble;
}
void sendForceLedgerRequest(int bookieIndex) {
bookieClient.forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
}
@Override
public void safeRun() {
initiate();
}
void initiate() {
// capture currentNonDurableLastAddConfirmed
// remember that we are inside OrderedExecutor, this induces a strict ordering
// on the sequence of events
this.currentNonDurableLastAddConfirmed = lh.pendingAddsSequenceHead;
if (LOG.isDebugEnabled()) {
LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
}
// we need to send the request to every bookie in the ensamble
this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
.getEnsembleSet(currentNonDurableLastAddConfirmed);
try {
for (int i = 0; i < writeSet.size(); i++) {
sendForceLedgerRequest(writeSet.get(i));
}
} finally {
writeSet.recycle();
}
}
@Override
public void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
int bookieIndex = (Integer) ctx;
checkState(!completed, "We are waiting for all the bookies, it is not expected an early exit");
if (errored) {
// already failed, do not fire error callbacks twice
return;
}
if (BKException.Code.OK != rc) {
lastSeenError = rc;
}
if (rc == BKException.Code.OK) {
if (ackSet.completeBookieAndCheck(bookieIndex)) {
completed = true;
// we are able to say that every bookie sync'd its own journal
// for every ackknowledged entry before issuing the force() call
if (LOG.isDebugEnabled()) {
LOG.debug("After force on ledger {} updating LastAddConfirmed to {} ",
ledgerId, currentNonDurableLastAddConfirmed);
}
lh.updateLastConfirmed(currentNonDurableLastAddConfirmed, lh.getLength());
FutureUtils.complete(cb, null);
}
} else {
// at least one bookie failed, as we are waiting for all the bookies
// we can fail immediately
LOG.info("ForceLedger did not succeed: Ledger {} on {}", ledgerId, addr);
errored = true;
// notify the failure
FutureUtils.completeExceptionally(cb, BKException.create(lastSeenError));
}
}
}