blob: 4e6e3fb3f9adcdce76a6150ba06ff9247c37453f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.BookiesListener;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Auditor is a single entity in the entire Bookie cluster and will be watching
* all the bookies under 'ledgerrootpath/available' zkpath. When any of the
* bookie failed or disconnected from zk, he will start initiating the
* re-replication activities by keeping all the corresponding ledgers of the
* failed bookie as underreplicated znode in zk.
*/
public class Auditor implements BookiesListener {
private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
private final ServerConfiguration conf;
private BookKeeper bkc;
private BookKeeperAdmin admin;
private BookieLedgerIndexer bookieLedgerIndexer;
private LedgerManager ledgerManager;
private LedgerUnderreplicationManager ledgerUnderreplicationManager;
private final ScheduledExecutorService executor;
private List<String> knownBookies = new ArrayList<String>();
private final String bookieIdentifier;
public Auditor(final String bookieIdentifier, ServerConfiguration conf,
ZooKeeper zkc) throws UnavailableException {
this.conf = conf;
this.bookieIdentifier = bookieIdentifier;
initialize(conf, zkc);
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
t.setDaemon(true);
return t;
}
});
}
private void initialize(ServerConfiguration conf, ZooKeeper zkc)
throws UnavailableException {
try {
LedgerManagerFactory ledgerManagerFactory = LedgerManagerFactory
.newLedgerManagerFactory(conf, zkc);
ledgerManager = ledgerManagerFactory.newLedgerManager();
this.bookieLedgerIndexer = new BookieLedgerIndexer(ledgerManager);
this.ledgerUnderreplicationManager = ledgerManagerFactory
.newLedgerUnderreplicationManager();
this.bkc = new BookKeeper(new ClientConfiguration(conf), zkc);
this.admin = new BookKeeperAdmin(bkc);
} catch (CompatibilityException ce) {
throw new UnavailableException(
"CompatibilityException while initializing Auditor", ce);
} catch (IOException ioe) {
throw new UnavailableException(
"IOException while initializing Auditor", ioe);
} catch (KeeperException ke) {
throw new UnavailableException(
"KeeperException while initializing Auditor", ke);
} catch (InterruptedException ie) {
throw new UnavailableException(
"Interrupted while initializing Auditor", ie);
}
}
private void submitShutdownTask() {
synchronized (this) {
if (executor.isShutdown()) {
return;
}
executor.submit(new Runnable() {
public void run() {
synchronized (Auditor.this) {
executor.shutdown();
}
}
});
}
}
@VisibleForTesting
synchronized Future<?> submitAuditTask() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void>create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
try {
waitIfLedgerReplicationDisabled();
List<String> availableBookies = getAvailableBookies();
// casting to String, as knownBookies and availableBookies
// contains only String values
// find new bookies(if any) and update the known bookie list
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
// find lost bookies(if any)
Collection<String> lostBookies = CollectionUtils.subtract(
knownBookies, availableBookies);
if (lostBookies.size() > 0) {
knownBookies.removeAll(lostBookies);
auditBookies();
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
} catch (KeeperException ke) {
LOG.error("Exception reading bookie list", ke);
}
}
});
}
public void start() {
LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
// on startup watching available bookie and based on the
// available bookies determining the bookie failures.
synchronized (this) {
if (executor.isShutdown()) {
return;
}
long interval = conf.getAuditorPeriodicCheckInterval();
if (interval > 0) {
LOG.info("Auditor periodic ledger checking enabled"
+ " 'auditorPeriodicCheckInterval' {} seconds", interval);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
LOG.info("Running periodic check");
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping");
return;
}
checkAllLedgers();
} catch (KeeperException ke) {
LOG.error("Exception while running periodic check", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while running periodic check", ie);
} catch (BKAuditException bkae) {
LOG.error("Exception while running periodic check", bkae);
} catch (BKException bke) {
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
LOG.error("I/O exception running periodic check", ioe);
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable "
+"running periodic check", ue);
}
}
}, interval, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic checking disabled");
}
try {
knownBookies = getAvailableBookies();
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, exiting", bke);
submitShutdownTask();
}
long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
if (bookieCheckInterval == 0) {
LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
executor.submit(BOOKIE_CHECK);
} else {
LOG.info("Auditor periodic bookie checking enabled"
+ " 'auditorPeriodicBookieCheckInterval' {} seconds", bookieCheckInterval);
executor.scheduleAtFixedRate(BOOKIE_CHECK, 0, bookieCheckInterval, TimeUnit.SECONDS);
}
}
}
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
cb.await();
}
}
private List<String> getAvailableBookies() throws BKException {
// Get the available bookies, also watch for further changes
// Watching on only available bookies is sufficient, as changes in readonly bookies also changes in available
// bookies
admin.notifyBookiesChanged(this);
Collection<BookieSocketAddress> availableBkAddresses = admin.getAvailableBookies();
Collection<BookieSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
availableBkAddresses.addAll(readOnlyBkAddresses);
List<String> availableBookies = new ArrayList<String>();
for (BookieSocketAddress addr : availableBkAddresses) {
availableBookies.add(addr.toString());
}
return availableBookies;
}
@SuppressWarnings("unchecked")
private void auditBookies()
throws BKAuditException, KeeperException,
InterruptedException, BKException {
try {
waitIfLedgerReplicationDisabled();
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
// put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
// has been disabled while we were generating the index
// discard this run, and schedule a new one
executor.submit(BOOKIE_CHECK);
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
List<String> availableBookies = getAvailableBookies();
// find lost bookies
Set<String> knownBookies = ledgerDetails.keySet();
Collection<String> lostBookies = CollectionUtils.subtract(knownBookies,
availableBookies);
if (lostBookies.size() > 0)
handleLostBookies(lostBookies, ledgerDetails);
}
private Map<String, Set<Long>> generateBookie2LedgersIndex()
throws BKAuditException {
return bookieLedgerIndexer.getBookieToLedgerIndex();
}
private void handleLostBookies(Collection<String> lostBookies,
Map<String, Set<Long>> ledgerDetails) throws BKAuditException {
LOG.info("Following are the failed bookies: " + lostBookies
+ " and searching its ledgers for re-replication");
for (String bookieIP : lostBookies) {
// identify all the ledgers in bookieIP and publishing these ledgers
// as under-replicated.
publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP));
}
}
private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers)
throws BKAuditException {
if (null == ledgers || ledgers.size() == 0) {
// there is no ledgers available for this bookie and just
// ignoring the bookie failures
LOG.info("There is no ledgers for the failed bookie: " + bookieIP);
return;
}
LOG.info("Following ledgers: " + ledgers + " of bookie: " + bookieIP
+ " are identified as underreplicated");
for (Long ledgerId : ledgers) {
try {
ledgerUnderreplicationManager.markLedgerUnderreplicated(
ledgerId, bookieIP);
} catch (UnavailableException ue) {
throw new BKAuditException(
"Failed to publish underreplicated ledger: " + ledgerId
+ " of bookie: " + bookieIP, ue);
}
}
}
/**
* Process the result returned from checking a ledger
*/
private class ProcessLostFragmentsCb implements GenericCallback<Set<LedgerFragment>> {
final LedgerHandle lh;
final AsyncCallback.VoidCallback callback;
ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
this.lh = lh;
this.callback = callback;
}
public void operationComplete(int rc, Set<LedgerFragment> fragments) {
try {
if (rc == BKException.Code.OK) {
Set<BookieSocketAddress> bookies = Sets.newHashSet();
for (LedgerFragment f : fragments) {
bookies.add(f.getAddress());
}
for (BookieSocketAddress bookie : bookies) {
publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
}
}
lh.close();
} catch (BKException bke) {
LOG.error("Error closing lh", bke);
if (rc == BKException.Code.OK) {
rc = BKException.Code.ReplicationException;
}
} catch (InterruptedException ie) {
LOG.error("Interrupted publishing suspected ledger", ie);
Thread.currentThread().interrupt();
if (rc == BKException.Code.OK) {
rc = BKException.Code.InterruptedException;
}
} catch (BKAuditException bkae) {
LOG.error("Auditor exception publishing suspected ledger", bkae);
if (rc == BKException.Code.OK) {
rc = BKException.Code.ReplicationException;
}
}
callback.processResult(rc, null, null);
}
}
/**
* List all the ledgers and check them individually. This should not
* be run very often.
*/
void checkAllLedgers() throws BKAuditException, BKException,
IOException, InterruptedException, KeeperException {
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout());
ZooKeeper newzk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w);
final BookKeeper client = new BookKeeper(new ClientConfiguration(conf),
newzk);
final BookKeeperAdmin admin = new BookKeeperAdmin(client);
try {
final LedgerChecker checker = new LedgerChecker(client);
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
Processor<Long> checkLedgersProcessor = new Processor<Long>() {
@Override
public void process(final Long ledgerId,
final AsyncCallback.VoidCallback callback) {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
processDone.countDown();
return;
}
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable "
+"running periodic check", ue);
processDone.countDown();
return;
}
LedgerHandle lh = null;
try {
lh = admin.openLedgerNoRecovery(ledgerId);
checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback));
} catch (BKException.BKNoSuchLedgerExistsException bknsle) {
LOG.debug("Ledger was deleted before we could check it", bknsle);
callback.processResult(BKException.Code.OK,
null, null);
return;
} catch (BKException bke) {
LOG.error("Couldn't open ledger " + ledgerId, bke);
callback.processResult(BKException.Code.BookieHandleNotAvailableException,
null, null);
return;
} catch (InterruptedException ie) {
LOG.error("Interrupted opening ledger", ie);
Thread.currentThread().interrupt();
callback.processResult(BKException.Code.InterruptedException, null, null);
return;
} finally {
if (lh != null) {
try {
lh.close();
} catch (BKException bke) {
LOG.warn("Couldn't close ledger " + ledgerId, bke);
} catch (InterruptedException ie) {
LOG.warn("Interrupted closing ledger " + ledgerId, ie);
Thread.currentThread().interrupt();
}
}
}
}
};
ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object obj) {
returnCode.set(rc);
processDone.countDown();
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BKAuditException(
"Exception while checking ledgers", e);
}
if (returnCode.get() != BKException.Code.OK) {
throw BKException.create(returnCode.get());
}
} finally {
admin.close();
client.close();
newzk.close();
}
}
@Override
public void availableBookiesChanged() {
submitAuditTask();
}
/**
* Shutdown the auditor
*/
public void shutdown() {
LOG.info("Shutting down auditor");
submitShutdownTask();
try {
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("Executor not shutting down, interrupting");
executor.shutdownNow();
}
admin.close();
bkc.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down auditor bookie", ie);
} catch (BKException bke) {
LOG.warn("Exception while shutting down auditor bookie", bke);
}
}
/**
* Return true if auditor is running otherwise return false
*
* @return auditor status
*/
public boolean isRunning() {
return !executor.isShutdown();
}
private final Runnable BOOKIE_CHECK = new Runnable() {
public void run() {
try {
auditBookies();
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, exiting", bke);
submitShutdownTask();
} catch (KeeperException ke) {
LOG.error("Exception while watching available bookies", ke);
submitShutdownTask();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
submitShutdownTask();
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
submitShutdownTask();
}
}
};
}