blob: 02154e580512d2e3e3577ba3722a6925d4b98bdb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.SortedMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.bookie.BookieThread;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.BKException.BKReadException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
/**
* ReplicationWorker will take the fragments one by one from
* ZKLedgerUnderreplicationManager and replicates to it.
*/
public class ReplicationWorker implements Runnable {
private final static Logger LOG = LoggerFactory
.getLogger(ReplicationWorker.class);
final private LedgerUnderreplicationManager underreplicationManager;
private final ServerConfiguration conf;
private final ZooKeeper zkc;
private volatile boolean workerRunning = false;
final private BookKeeperAdmin admin;
private final LedgerChecker ledgerChecker;
private final BookieSocketAddress targetBookie;
private final BookKeeper bkc;
private final Thread workerThread;
private final long openLedgerRereplicationGracePeriod;
private final Timer pendingReplicationTimer;
// Expose Stats
private final OpStatsLogger rereplicateOpStats;
/**
* Replication worker for replicating the ledger fragments from
* UnderReplicationManager to the targetBookie. This target bookie will be a
* local bookie.
*
* @param zkc
* - ZK instance
* @param conf
* - configurations
* @param targetBKAddr
* - to where replication should happen. Ideally this will be
* local Bookie address.
*/
public ReplicationWorker(final ZooKeeper zkc,
final ServerConfiguration conf, BookieSocketAddress targetBKAddr)
throws CompatibilityException, KeeperException,
InterruptedException, IOException {
this(zkc, conf, targetBKAddr, NullStatsLogger.INSTANCE);
}
/**
* Replication worker for replicating the ledger fragments from
* UnderReplicationManager to the targetBookie. This target bookie will be a
* local bookie.
*
* @param zkc
* - ZK instance
* @param conf
* - configurations
* @param targetBKAddr
* - to where replication should happen. Ideally this will be
* local Bookie address.
*/
public ReplicationWorker(final ZooKeeper zkc,
final ServerConfiguration conf, BookieSocketAddress targetBKAddr,
StatsLogger statsLogger)
throws CompatibilityException, KeeperException,
InterruptedException, IOException {
this.zkc = zkc;
this.conf = conf;
this.targetBookie = targetBKAddr;
LedgerManagerFactory mFactory = LedgerManagerFactory
.newLedgerManagerFactory(this.conf, this.zkc);
this.underreplicationManager = mFactory
.newLedgerUnderreplicationManager();
this.bkc = BookKeeper.forConfig(new ClientConfiguration(conf))
.setZookeeper(zkc)
.setStatsLogger(statsLogger.scope(BK_CLIENT_SCOPE))
.build();
this.admin = new BookKeeperAdmin(bkc);
this.ledgerChecker = new LedgerChecker(bkc);
this.workerThread = new BookieThread(this, "ReplicationWorker");
this.openLedgerRereplicationGracePeriod = conf
.getOpenLedgerRereplicationGracePeriod();
this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
// Expose Stats
this.rereplicateOpStats = statsLogger.getOpStatsLogger(REREPLICATE_OP);
}
/** Start the replication worker */
public void start() {
this.workerThread.start();
}
@Override
public void run() {
workerRunning = true;
while (workerRunning) {
try {
rereplicate();
} catch (InterruptedException e) {
shutdown();
Thread.currentThread().interrupt();
LOG.info("InterruptedException "
+ "while replicating fragments", e);
return;
} catch (BKException e) {
shutdown();
LOG.error("BKException while replicating fragments", e);
return;
} catch (UnavailableException e) {
shutdown();
LOG.error("UnavailableException "
+ "while replicating fragments", e);
return;
}
}
LOG.info("ReplicationWorker exited loop!");
}
/**
* Replicates the under replicated fragments from failed bookie ledger to
* targetBookie
*/
private void rereplicate() throws InterruptedException, BKException,
UnavailableException {
long ledgerIdToReplicate = underreplicationManager
.getLedgerToRereplicate();
Stopwatch stopwatch = new Stopwatch().start();
boolean success = false;
try {
success = rereplicate(ledgerIdToReplicate);
} finally {
long latencyMillis = stopwatch.stop().elapsedMillis();
if (success) {
rereplicateOpStats.registerSuccessfulEvent(latencyMillis);
} else {
rereplicateOpStats.registerFailedEvent(latencyMillis);
}
}
}
private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
UnavailableException {
LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
LedgerHandle lh;
try {
lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
} catch (BKNoSuchLedgerExistsException e) {
// Ledger might have been deleted by user
LOG.info("BKNoSuchLedgerExistsException while opening "
+ "ledger for replication. Other clients "
+ "might have deleted the ledger. "
+ "So, no harm to continue");
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return false;
} catch (BKReadException e) {
LOG.info("BKReadException while"
+ " opening ledger for replication."
+ " Enough Bookies might not have available"
+ "So, no harm to continue");
underreplicationManager
.releaseUnderreplicatedLedger(ledgerIdToReplicate);
return false;
} catch (BKBookieHandleNotAvailableException e) {
LOG.info("BKBookieHandleNotAvailableException while"
+ " opening ledger for replication."
+ " Enough Bookies might not have available"
+ "So, no harm to continue");
underreplicationManager
.releaseUnderreplicatedLedger(ledgerIdToReplicate);
return false;
}
Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
boolean foundOpenFragments = false;
for (LedgerFragment ledgerFragment : fragments) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
continue;
} else if (isTargetBookieExistsInFragmentEnsemble(lh,
ledgerFragment)) {
LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
ledgerFragment.getEnsemble());
continue;
}
try {
admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
} catch (BKException.BKBookieHandleNotAvailableException e) {
LOG.warn("BKBookieHandleNotAvailableException "
+ "while replicating the fragment", e);
} catch (BKException.BKLedgerRecoveryException e) {
LOG.warn("BKLedgerRecoveryException "
+ "while replicating the fragment", e);
if (admin.getReadOnlyBookies().contains(targetBookie)) {
throw new BKException.BKWriteOnReadOnlyBookieException();
}
}
}
if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
deferLedgerLockRelease(ledgerIdToReplicate);
return false;
}
fragments = getUnderreplicatedFragments(lh);
if (fragments.size() == 0) {
LOG.info("Ledger replicated successfully. ledger id is: "
+ ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return true;
} else {
// Releasing the underReplication ledger lock and compete
// for the replication again for the pending fragments
underreplicationManager
.releaseUnderreplicatedLedger(ledgerIdToReplicate);
return false;
}
}
/**
* When checking the fragments of a ledger, there is a corner case
* where if the last segment/ensemble is open, but nothing has been written to
* some of the quorums in the ensemble, bookies can fail without any action being
* taken. This is fine, until enough bookies fail to cause a quorum to become
* unavailable, by which time the ledger is unrecoverable.
*
* For example, if in a E3Q2, only 1 entry is written and the last bookie
* in the ensemble fails, nothing has been written to it, so nothing needs to be
* recovered. But if the second to last bookie fails, we've now lost quorum for
* the second entry, so it's impossible to see if the second has been written or
* not.
*
* To avoid this situation, we need to check if bookies in the final open ensemble
* are unavailable, and take action if so. The action to take is to close the ledger,
* after a grace period as the writting client may replace the faulty bookie on its
* own.
*
* Missing bookies in closed ledgers are fine, as we know the last confirmed add, so
* we can tell which entries are supposed to exist and rereplicate them if necessary.
*/
private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKException {
LedgerMetadata md = admin.getLedgerMetadata(lh);
if (md.isClosed()) {
return false;
}
SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles
= admin.getLedgerMetadata(lh).getEnsembles();
ArrayList<BookieSocketAddress> finalEnsemble = ensembles.get(ensembles.lastKey());
Collection<BookieSocketAddress> available = admin.getAvailableBookies();
for (BookieSocketAddress b : finalEnsemble) {
if (!available.contains(b)) {
return true;
}
}
return false;
}
/** Gets the under replicated fragments */
private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh)
throws InterruptedException {
CheckerCallback checkerCb = new CheckerCallback();
ledgerChecker.checkLedger(lh, checkerCb);
Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
return fragments;
}
/**
* Schedules a timer task for releasing the lock which will be scheduled
* after open ledger fragment replication time. Ledger will be fenced if it
* is still in open state when timer task fired
*/
private void deferLedgerLockRelease(final long ledgerId) {
long gracePeriod = this.openLedgerRereplicationGracePeriod;
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
LedgerHandle lh = null;
try {
lh = admin.openLedgerNoRecovery(ledgerId);
if (isLastSegmentOpenAndMissingBookies(lh)) {
lh = admin.openLedger(ledgerId);
}
Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
for (LedgerFragment fragment : fragments) {
if (!fragment.isClosed()) {
lh = admin.openLedger(ledgerId);
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("InterruptedException "
+ "while replicating fragments", e);
} catch (BKNoSuchLedgerExistsException bknsle) {
LOG.debug("Ledger was deleted, safe to continue", bknsle);
} catch (BKException e) {
LOG.error("BKException while fencing the ledger"
+ " for rereplication of postponed ledgers", e);
} finally {
try {
if (lh != null) {
lh.close();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("InterruptedException while closing "
+ "ledger", e);
} catch (BKException e) {
// Lets go ahead and release the lock. Catch actual
// exception in normal replication flow and take
// action.
LOG.warn("BKException while closing ledger ", e);
} finally {
try {
underreplicationManager
.releaseUnderreplicatedLedger(ledgerId);
} catch (UnavailableException e) {
shutdown();
LOG.error("UnavailableException "
+ "while replicating fragments", e);
}
}
}
}
};
pendingReplicationTimer.schedule(timerTask, gracePeriod);
}
/**
* Stop the replication worker service
*/
public void shutdown() {
synchronized (this) {
if (!workerRunning) {
return;
}
workerRunning = false;
}
LOG.info("Shutting down ReplicationWorker");
this.pendingReplicationTimer.cancel();
try {
this.workerThread.interrupt();
this.workerThread.join();
} catch (InterruptedException e) {
LOG.error("Interrupted during shutting down replication worker : ",
e);
Thread.currentThread().interrupt();
}
try {
bkc.close();
} catch (InterruptedException e) {
LOG.warn("Interrupted while closing the Bookie client", e);
Thread.currentThread().interrupt();
} catch (BKException e) {
LOG.warn("Exception while closing the Bookie client", e);
}
try {
underreplicationManager.close();
} catch (UnavailableException e) {
LOG.warn("Exception while closing the "
+ "ZkLedgerUnderrepliationManager", e);
}
}
/**
* Gives the running status of ReplicationWorker
*/
boolean isRunning() {
return workerRunning && workerThread.isAlive();
}
private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
LedgerFragment ledgerFragment) {
List<BookieSocketAddress> ensemble = ledgerFragment.getEnsemble();
for (BookieSocketAddress bkAddr : ensemble) {
if (targetBookie.equals(bkAddr)) {
return true;
}
}
return false;
}
/** Ledger checker call back */
private static class CheckerCallback implements
GenericCallback<Set<LedgerFragment>> {
private Set<LedgerFragment> result = null;
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void operationComplete(int rc, Set<LedgerFragment> result) {
this.result = result;
latch.countDown();
}
/**
* Wait until operation complete call back comes and return the ledger
* fragments set
*/
Set<LedgerFragment> waitAndGetResult() throws InterruptedException {
latch.await();
return result;
}
}
}