blob: cbf9b45c9544fa7be67386bea5176d15b2fbd9a7 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.CHECK_ALL_LEDGERS_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIES_PER_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Auditor is a single entity in the entire Bookie cluster and will be watching
* all the bookies under 'ledgerrootpath/available' zkpath. When any of the
* bookie failed or disconnected from zk, he will start initiating the
* re-replication activities by keeping all the corresponding ledgers of the
* failed bookie as underreplicated znode in zk.
*
* <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1332}
*/
@StatsDoc(
name = AUDITOR_SCOPE,
help = "Auditor related stats"
)
public class Auditor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
private final ServerConfiguration conf;
private final BookKeeper bkc;
private final boolean ownBkc;
private BookKeeperAdmin admin;
private BookieLedgerIndexer bookieLedgerIndexer;
private LedgerManager ledgerManager;
private LedgerUnderreplicationManager ledgerUnderreplicationManager;
private final ScheduledExecutorService executor;
private List<String> knownBookies = new ArrayList<String>();
private final String bookieIdentifier;
private volatile Future<?> auditTask;
private Set<String> bookiesToBeAudited = Sets.newHashSet();
private volatile int lostBookieRecoveryDelayBeforeChange;
private final AtomicInteger ledgersNotAdheringToPlacementPolicyGuageValue;
private final AtomicInteger numOfLedgersFoundInPlacementPolicyCheck;
private final StatsLogger statsLogger;
@StatsDoc(
name = NUM_UNDER_REPLICATED_LEDGERS,
help = "the distribution of num under_replicated ledgers on each auditor run"
)
private final OpStatsLogger numUnderReplicatedLedger;
@StatsDoc(
name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE,
help = "the latency distribution of publishing under replicated ledgers for lost bookies"
)
private final OpStatsLogger uRLPublishTimeForLostBookies;
@StatsDoc(
name = BOOKIE_TO_LEDGERS_MAP_CREATION_TIME,
help = "the latency distribution of creating bookies-to-ledgers map"
)
private final OpStatsLogger bookieToLedgersMapCreationTime;
@StatsDoc(
name = CHECK_ALL_LEDGERS_TIME,
help = "the latency distribution of checking all ledgers"
)
private final OpStatsLogger checkAllLedgersTime;
@StatsDoc(
name = PLACEMENT_POLICY_CHECK_TIME,
help = "the latency distribution of placementPolicy check"
)
private final OpStatsLogger placementPolicyCheckTime;
@StatsDoc(
name = AUDIT_BOOKIES_TIME,
help = "the latency distribution of auditing all the bookies"
)
private final OpStatsLogger auditBookiesTime;
@StatsDoc(
name = NUM_LEDGERS_CHECKED,
help = "the number of ledgers checked by the auditor"
)
private final Counter numLedgersChecked;
@StatsDoc(
name = NUM_FRAGMENTS_PER_LEDGER,
help = "the distribution of number of fragments per ledger"
)
private final OpStatsLogger numFragmentsPerLedger;
@StatsDoc(
name = NUM_BOOKIES_PER_LEDGER,
help = "the distribution of number of bookies per ledger"
)
private final OpStatsLogger numBookiesPerLedger;
@StatsDoc(
name = NUM_BOOKIE_AUDITS_DELAYED,
help = "the number of bookie-audits delayed"
)
private final Counter numBookieAuditsDelayed;
@StatsDoc(
name = NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED,
help = "the number of delayed-bookie-audits cancelled"
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
)
private final Gauge<Integer> numLedgersNotAdheringToPlacementPolicy;
static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
}
static BookKeeper createBookKeeperClient(ServerConfiguration conf, StatsLogger statsLogger)
throws InterruptedException, IOException {
ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
try {
return BookKeeper.forConfig(clientConfiguration).statsLogger(statsLogger).build();
} catch (BKException e) {
throw new IOException("Failed to create bookkeeper client", e);
}
}
static BookKeeper createBookKeeperClientThrowUnavailableException(ServerConfiguration conf)
throws UnavailableException {
try {
return createBookKeeperClient(conf);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UnavailableException("Failed to create bookkeeper client", e);
} catch (IOException e) {
throw new UnavailableException("Failed to create bookkeeper client", e);
}
}
public Auditor(final String bookieIdentifier,
ServerConfiguration conf,
StatsLogger statsLogger)
throws UnavailableException {
this(
bookieIdentifier,
conf,
createBookKeeperClientThrowUnavailableException(conf),
true,
statsLogger);
}
public Auditor(final String bookieIdentifier,
ServerConfiguration conf,
BookKeeper bkc,
boolean ownBkc,
StatsLogger statsLogger)
throws UnavailableException {
this.conf = conf;
this.bookieIdentifier = bookieIdentifier;
this.statsLogger = statsLogger;
this.numOfLedgersFoundInPlacementPolicyCheck = new AtomicInteger(0);
this.ledgersNotAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
uRLPublishTimeForLostBookies = this.statsLogger
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
placementPolicyCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
auditBookiesTime = this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
numBookiesPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return ledgersNotAdheringToPlacementPolicyGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
numLedgersNotAdheringToPlacementPolicy);
this.bkc = bkc;
this.ownBkc = ownBkc;
initialize(conf, bkc);
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
t.setDaemon(true);
return t;
}
});
}
private void initialize(ServerConfiguration conf, BookKeeper bkc)
throws UnavailableException {
try {
LedgerManagerFactory ledgerManagerFactory = AbstractZkLedgerManagerFactory
.newLedgerManagerFactory(
conf,
bkc.getMetadataClientDriver().getLayoutManager());
ledgerManager = ledgerManagerFactory.newLedgerManager();
this.bookieLedgerIndexer = new BookieLedgerIndexer(ledgerManager);
this.ledgerUnderreplicationManager = ledgerManagerFactory
.newLedgerUnderreplicationManager();
this.admin = new BookKeeperAdmin(bkc, statsLogger);
LOG.info("AuthProvider used by the Auditor is {}",
admin.getConf().getClientAuthProviderFactoryClass());
if (this.ledgerUnderreplicationManager
.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}",
conf.getLostBookieRecoveryDelay());
} else {
LOG.info("Valid lostBookieRecoveryDelay zNode is available, so not creating "
+ "lostBookieRecoveryDelay zNode as part of Auditor initialization ");
}
lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
} catch (CompatibilityException ce) {
throw new UnavailableException(
"CompatibilityException while initializing Auditor", ce);
} catch (IOException | KeeperException ioe) {
throw new UnavailableException(
"Exception while initializing Auditor", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new UnavailableException(
"Interrupted while initializing Auditor", ie);
}
}
private void submitShutdownTask() {
synchronized (this) {
LOG.info("Executing submitShutdownTask");
if (executor.isShutdown()) {
LOG.info("executor is already shutdown");
return;
}
executor.submit(new Runnable() {
public void run() {
synchronized (Auditor.this) {
LOG.info("Shutting down Auditor's Executor");
executor.shutdown();
}
}
});
}
}
@VisibleForTesting
synchronized Future<?> submitAuditTask() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void>create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
try {
waitIfLedgerReplicationDisabled();
int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();
List<String> availableBookies = getAvailableBookies();
// casting to String, as knownBookies and availableBookies
// contains only String values
// find new bookies(if any) and update the known bookie list
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) {
// the bookie, which went down earlier and had an audit scheduled for,
// has come up. So let us stop tracking it and cancel the audit. Since
// we allow delaying of audit when there is only one failed bookie,
// bookiesToBeAudited should just have 1 element and hence containsAll
// check should be ok
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
bookiesToBeAudited.clear();
}
// find lost bookies(if any)
bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies));
if (bookiesToBeAudited.size() == 0) {
return;
}
knownBookies.removeAll(bookiesToBeAudited);
if (lostBookieRecoveryDelay == 0) {
startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (bookiesToBeAudited.size() > 1) {
// if more than one bookie is down, start the audit immediately;
LOG.info("Multiple bookie failure; not delaying bookie audit. "
+ "Bookies lost now: {}; All lost bookies: {}",
CollectionUtils.subtract(knownBookies, availableBookies),
bookiesToBeAudited);
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (auditTask == null) {
// if there is no scheduled audit, schedule one
auditTask = executor.schedule(new Runnable() {
public void run() {
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}
}, lostBookieRecoveryDelay, TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay,
bookiesToBeAudited);
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
}
}
});
}
synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void> create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(new Runnable() {
int lostBookieRecoveryDelay = -1;
public void run() {
try {
waitIfLedgerReplicationDisabled();
lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();
// if there is pending auditTask, cancel the task. So that it can be rescheduled
// after new lostBookieRecoveryDelay period
if (auditTask != null) {
LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
auditTask.cancel(false);
numDelayedBookieAuditsCancelled.inc();
}
// if lostBookieRecoveryDelay is set to its previous value then consider it as
// signal to trigger the Audit immediately.
if ((lostBookieRecoveryDelay == 0)
|| (lostBookieRecoveryDelay == lostBookieRecoveryDelayBeforeChange)) {
LOG.info(
"lostBookieRecoveryDelay has been set to 0 or reset to its previous value, "
+ "so starting AuditTask. Current lostBookieRecoveryDelay: {}, "
+ "previous lostBookieRecoveryDelay: {}",
lostBookieRecoveryDelay, lostBookieRecoveryDelayBeforeChange);
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
} else if (auditTask != null) {
LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly",
lostBookieRecoveryDelay);
auditTask = executor.schedule(new Runnable() {
public void run() {
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}
}, lostBookieRecoveryDelay, TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while reading from ZK", ue);
} finally {
if (lostBookieRecoveryDelay != -1) {
lostBookieRecoveryDelayBeforeChange = lostBookieRecoveryDelay;
}
}
}
});
}
public void start() {
LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
// on startup watching available bookie and based on the
// available bookies determining the bookie failures.
synchronized (this) {
if (executor.isShutdown()) {
return;
}
try {
watchBookieChanges();
knownBookies = getAvailableBookies();
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, so exiting", bke);
submitShutdownTask();
}
try {
this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
} catch (UnavailableException ue) {
LOG.error("Exception while registering for LostBookieRecoveryDelay change notification, so exiting",
ue);
submitShutdownTask();
}
scheduleBookieCheckTask();
scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
}
}
private void scheduleBookieCheckTask() {
long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
if (bookieCheckInterval == 0) {
LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
executor.submit(bookieCheck);
} else {
LOG.info("Auditor periodic bookie checking enabled" + " 'auditorPeriodicBookieCheckInterval' {} seconds",
bookieCheckInterval);
executor.scheduleAtFixedRate(bookieCheck, 0, bookieCheckInterval, TimeUnit.SECONDS);
}
}
private void scheduleCheckAllLedgersTask(){
long interval = conf.getAuditorPeriodicCheckInterval();
if (interval > 0) {
LOG.info("Auditor periodic ledger checking enabled" + " 'auditorPeriodicCheckInterval' {} seconds",
interval);
long checkAllLedgersLastExecutedCTime;
long durationSinceLastExecutionInSecs;
long initialDelay;
try {
checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
checkAllLedgersLastExecutedCTime = -1;
}
if (checkAllLedgersLastExecutedCTime == -1) {
durationSinceLastExecutionInSecs = -1;
initialDelay = 0;
} else {
durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime)
/ 1000;
if (durationSinceLastExecutionInSecs < 0) {
// this can happen if there is no strict time ordering
durationSinceLastExecutionInSecs = 0;
}
initialDelay = durationSinceLastExecutionInSecs > interval ? 0
: (interval - durationSinceLastExecutionInSecs);
}
LOG.info(
"checkAllLedgers scheduling info. checkAllLedgersLastExecutedCTime: {} "
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping checkAllLedgers");
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting checkAllLedgers");
checkAllLedgers();
long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
} catch (KeeperException ke) {
LOG.error("Exception while running periodic check", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while running periodic check", ie);
} catch (BKException bke) {
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
LOG.error("I/O exception running periodic check", ioe);
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
}
}
}, initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic checking disabled");
}
}
private void schedulePlacementPolicyCheckTask(){
long interval = conf.getAuditorPeriodicPlacementPolicyCheckInterval();
if (interval > 0) {
LOG.info("Auditor periodic placement policy check enabled"
+ " 'auditorPeriodicPlacementPolicyCheckInterval' {} seconds", interval);
long placementPolicyCheckLastExecutedCTime;
long durationSinceLastExecutionInSecs;
long initialDelay;
try {
placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue);
placementPolicyCheckLastExecutedCTime = -1;
}
if (placementPolicyCheckLastExecutedCTime == -1) {
durationSinceLastExecutionInSecs = -1;
initialDelay = 0;
} else {
durationSinceLastExecutionInSecs = (System.currentTimeMillis() - placementPolicyCheckLastExecutedCTime)
/ 1000;
if (durationSinceLastExecutionInSecs < 0) {
// this can happen if there is no strict time ordering
durationSinceLastExecutionInSecs = 0;
}
initialDelay = durationSinceLastExecutionInSecs > interval ? 0
: (interval - durationSinceLastExecutionInSecs);
}
LOG.info(
"placementPolicyCheck scheduling info. placementPolicyCheckLastExecutedCTime: {} "
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting PlacementPolicyCheck");
placementPolicyCheck();
int numOfLedgersFoundInPlacementPolicyCheckValue = numOfLedgersFoundInPlacementPolicyCheck
.get();
long placementPolicyCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
LOG.info(
"Completed placementPolicyCheck in {} milliSeconds."
+ " numOfLedgersNotAdheringToPlacementPolicy {}",
placementPolicyCheckDuration, numOfLedgersFoundInPlacementPolicyCheckValue);
ledgersNotAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
TimeUnit.MILLISECONDS);
} catch (BKAuditException e) {
int numOfLedgersFoundInPlacementPolicyCheckValue = numOfLedgersFoundInPlacementPolicyCheck
.get();
if (numOfLedgersFoundInPlacementPolicyCheckValue > 0) {
/*
* Though there is BKAuditException while doing
* placementPolicyCheck, it found few ledgers not
* adhering to placement policy. So reporting it.
*/
ledgersNotAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
}
LOG.error(
"BKAuditException running periodic placementPolicy check."
+ "numOfLedgersNotAdheringToPlacementPolicy {}",
numOfLedgersFoundInPlacementPolicyCheckValue, e);
}
}
}, initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic placementPolicy check disabled");
}
}
private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
try {
Auditor.this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
} catch (UnavailableException ae) {
LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae);
}
Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
}
}
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("LedgerReplication is disabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
cb.await();
}
}
private List<String> getAvailableBookies() throws BKException {
// Get the available bookies
Collection<BookieSocketAddress> availableBkAddresses = admin.getAvailableBookies();
Collection<BookieSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
availableBkAddresses.addAll(readOnlyBkAddresses);
List<String> availableBookies = new ArrayList<String>();
for (BookieSocketAddress addr : availableBkAddresses) {
availableBookies.add(addr.toString());
}
return availableBookies;
}
private void watchBookieChanges() throws BKException {
admin.watchWritableBookiesChanged(bookies -> submitAuditTask());
admin.watchReadOnlyBookiesChanged(bookies -> submitAuditTask());
}
/**
* Start running the actual audit task.
*
* @param shutDownTask
* A boolean that indicates whether or not to schedule shutdown task on any failure
*/
private void startAudit(boolean shutDownTask) {
try {
auditBookies();
shutDownTask = false;
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
}
if (shutDownTask) {
submitShutdownTask();
}
}
@SuppressWarnings("unchecked")
private void auditBookies()
throws BKAuditException, InterruptedException, BKException {
try {
waitIfLedgerReplicationDisabled();
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
LOG.info("Starting auditBookies");
Stopwatch stopwatch = Stopwatch.createStarted();
// put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
// has been disabled while we were generating the index
// discard this run, and schedule a new one
executor.submit(bookieCheck);
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
List<String> availableBookies = getAvailableBookies();
// find lost bookies
Set<String> knownBookies = ledgerDetails.keySet();
Collection<String> lostBookies = CollectionUtils.subtract(knownBookies,
availableBookies);
bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
if (lostBookies.size() > 0) {
try {
FutureUtils.result(
handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
} catch (ReplicationException e) {
throw new BKAuditException(e.getMessage(), e.getCause());
}
uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
LOG.info("Completed auditBookies");
auditBookiesTime.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
private Map<String, Set<Long>> generateBookie2LedgersIndex()
throws BKAuditException {
return bookieLedgerIndexer.getBookieToLedgerIndex();
}
private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies,
Map<String, Set<Long>> ledgerDetails) {
LOG.info("Following are the failed bookies: {},"
+ " and searching its ledgers for re-replication", lostBookies);
return FutureUtils.processList(
Lists.newArrayList(lostBookies),
bookieIP -> publishSuspectedLedgersAsync(
Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),
null
);
}
private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
if (null == ledgers || ledgers.size() == 0) {
// there is no ledgers available for this bookie and just
// ignoring the bookie failures
LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
return FutureUtils.Void();
}
LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
return FutureUtils.processList(
Lists.newArrayList(ledgers),
ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
null
);
}
/**
* Process the result returned from checking a ledger.
*/
private class ProcessLostFragmentsCb implements GenericCallback<Set<LedgerFragment>> {
final LedgerHandle lh;
final AsyncCallback.VoidCallback callback;
ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
this.lh = lh;
this.callback = callback;
}
public void operationComplete(int rc, Set<LedgerFragment> fragments) {
if (rc == BKException.Code.OK) {
Set<BookieSocketAddress> bookies = Sets.newHashSet();
for (LedgerFragment f : fragments) {
bookies.addAll(f.getAddresses());
}
if (bookies.isEmpty()) {
// no missing fragments
callback.processResult(Code.OK, null, null);
return;
}
publishSuspectedLedgersAsync(
bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()),
Sets.newHashSet(lh.getId())
).whenComplete((result, cause) -> {
if (null != cause) {
LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
lh.getId(), bookies, cause);
callback.processResult(Code.ReplicationException, null, null);
} else {
callback.processResult(Code.OK, null, null);
}
});
} else {
callback.processResult(rc, null, null);
}
lh.closeAsync().whenComplete((result, cause) -> {
if (null != cause) {
LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage());
}
});
}
}
/**
* List all the ledgers and check them individually. This should not
* be run very often.
*/
void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
final BookKeeper localClient = createBookKeeperClient(conf);
final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient, statsLogger);
try {
final LedgerChecker checker = new LedgerChecker(localClient);
final CompletableFuture<Void> processFuture = new CompletableFuture<>();
Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
FutureUtils.complete(processFuture, null);
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
FutureUtils.complete(processFuture, null);
return;
}
localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
if (Code.OK == rc) {
checker.checkLedger(lh,
// the ledger handle will be closed after checkLedger is done.
new ProcessLostFragmentsCb(lh, callback),
conf.getAuditorLedgerVerificationPercentage());
// we collect the following stats to get a measure of the
// distribution of a single ledger within the bk cluster
// the higher the number of fragments/bookies, the more distributed it is
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
numLedgersChecked.inc();
} else if (Code.NoSuchLedgerExistsOnMetadataServerException == rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
}
callback.processResult(Code.OK, null, null);
} else {
LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc));
callback.processResult(rc, null, null);
}
}, null);
};
ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
(rc, path, ctx) -> {
if (Code.OK == rc) {
FutureUtils.complete(processFuture, null);
} else {
FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
FutureUtils.result(processFuture, BKException.HANDLER);
try {
ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set checkAllLedgersCTime", ue);
}
} finally {
localAdmin.close();
localClient.close();
}
}
void placementPolicyCheck() throws BKAuditException {
final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
this.numOfLedgersFoundInPlacementPolicyCheck.set(0);
Processor<Long> ledgerProcessor = new Processor<Long>() {
@Override
public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadataVer, exception) -> {
if (exception == null) {
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (metadata.isClosed()) {
boolean foundSegmentNotAdheringToPlacementPolicy = false;
for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble : metadata
.getAllEnsembles().entrySet()) {
long startEntryIdOfSegment = ensemble.getKey();
List<BookieSocketAddress> ensembleOfSegment = ensemble.getValue();
boolean segmentAdheringToPlacementPolicy = admin.isEnsembleAdheringToPlacementPolicy(
ensembleOfSegment, writeQuorumSize, ackQuorumSize);
if (!segmentAdheringToPlacementPolicy) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
}
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundInPlacementPolicyCheck.incrementAndGet();
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
}
iterCallback.processResult(BKException.Code.OK, null, null);
} else if (BKException.getExceptionCode(exception)
== BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.debug("Ignoring replication of already deleted ledger {}", ledgerId);
iterCallback.processResult(BKException.Code.OK, null, null);
} else {
LOG.warn("Unable to read the ledger: {} information", ledgerId);
iterCallback.processResult(BKException.getExceptionCode(exception), null, null);
}
});
}
};
// Reading the result after processing all the ledgers
final List<Integer> resultCode = new ArrayList<Integer>(1);
ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object obj) {
resultCode.add(rc);
placementPolicyCheckLatch.countDown();
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
placementPolicyCheckLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BKAuditException("Exception while doing placementPolicy check", e);
}
if (!resultCode.contains(BKException.Code.OK)) {
throw new BKAuditException("Exception while doing placementPolicy check",
BKException.create(resultCode.get(0)));
}
try {
ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue);
}
}
/**
* Shutdown the auditor.
*/
public void shutdown() {
LOG.info("Shutting down auditor");
executor.shutdown();
try {
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("Executor not shutting down, interrupting");
executor.shutdownNow();
}
admin.close();
if (ownBkc) {
bkc.close();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down auditor bookie", ie);
} catch (BKException bke) {
LOG.warn("Exception while shutting down auditor bookie", bke);
}
}
@Override
public void close() {
shutdown();
}
/**
* Return true if auditor is running otherwise return false.
*
* @return auditor status
*/
public boolean isRunning() {
return !executor.isShutdown();
}
private final Runnable bookieCheck = new Runnable() {
public void run() {
if (auditTask == null) {
startAudit(true);
} else {
// if due to a lost bookie an audit task was scheduled,
// let us not run this periodic bookie check now, if we
// went ahead, we'll report under replication and the user
// wanted to avoid that(with lostBookieRecoveryDelay option)
LOG.info("Audit already scheduled; skipping periodic bookie check");
}
}
};
int getLostBookieRecoveryDelayBeforeChange() {
return lostBookieRecoveryDelayBeforeChange;
}
Future<?> getAuditTask() {
return auditTask;
}
}