blob: db6e3319dc766cf6ab9ca222b8257200a933b472 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.CHECK_ALL_LEDGERS_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIES_PER_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Auditor is a single entity in the entire Bookie cluster and will be watching
* all the bookies under 'ledgerrootpath/available' zkpath. When any of the
* bookie failed or disconnected from zk, he will start initiating the
* re-replication activities by keeping all the corresponding ledgers of the
* failed bookie as underreplicated znode in zk.
*
* <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1332}
*/
@StatsDoc(
name = AUDITOR_SCOPE,
help = "Auditor related stats"
)
public class Auditor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
private static final int MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS = 100;
private static final int REPLICAS_CHECK_TIMEOUT_IN_SECS = 120;
private static final BitSet EMPTY_BITSET = new BitSet();
private final ServerConfiguration conf;
private final BookKeeper bkc;
private final boolean ownBkc;
private final BookKeeperAdmin admin;
private final boolean ownAdmin;
private BookieLedgerIndexer bookieLedgerIndexer;
private LedgerManager ledgerManager;
private LedgerUnderreplicationManager ledgerUnderreplicationManager;
private final ScheduledExecutorService executor;
private List<String> knownBookies = new ArrayList<String>();
private final String bookieIdentifier;
private volatile Future<?> auditTask;
private Set<String> bookiesToBeAudited = Sets.newHashSet();
private volatile int lostBookieRecoveryDelayBeforeChange;
private final AtomicInteger ledgersNotAdheringToPlacementPolicyGuageValue;
private final AtomicInteger numOfLedgersFoundNotAdheringInPlacementPolicyCheck;
private final AtomicInteger ledgersSoftlyAdheringToPlacementPolicyGuageValue;
private final AtomicInteger numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck;
private final AtomicInteger numOfClosedLedgersAuditedInPlacementPolicyCheck;
private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriodGuageValue;
private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriod;
private final AtomicInteger numLedgersHavingNoReplicaOfAnEntryGuageValue;
private final AtomicInteger numLedgersFoundHavingNoReplicaOfAnEntry;
private final AtomicInteger numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue;
private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
private final long underreplicatedLedgerRecoveryGracePeriod;
private final int zkOpTimeoutMs;
private final StatsLogger statsLogger;
@StatsDoc(
name = NUM_UNDER_REPLICATED_LEDGERS,
help = "the distribution of num under_replicated ledgers on each auditor run"
)
private final OpStatsLogger numUnderReplicatedLedger;
@StatsDoc(
name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE,
help = "the latency distribution of publishing under replicated ledgers for lost bookies"
)
private final OpStatsLogger uRLPublishTimeForLostBookies;
@StatsDoc(
name = BOOKIE_TO_LEDGERS_MAP_CREATION_TIME,
help = "the latency distribution of creating bookies-to-ledgers map"
)
private final OpStatsLogger bookieToLedgersMapCreationTime;
@StatsDoc(
name = CHECK_ALL_LEDGERS_TIME,
help = "the latency distribution of checking all ledgers"
)
private final OpStatsLogger checkAllLedgersTime;
@StatsDoc(
name = PLACEMENT_POLICY_CHECK_TIME,
help = "the latency distribution of placementPolicy check"
)
private final OpStatsLogger placementPolicyCheckTime;
@StatsDoc(
name = REPLICAS_CHECK_TIME,
help = "the latency distribution of replicas check"
)
private final OpStatsLogger replicasCheckTime;
@StatsDoc(
name = AUDIT_BOOKIES_TIME,
help = "the latency distribution of auditing all the bookies"
)
private final OpStatsLogger auditBookiesTime;
@StatsDoc(
name = NUM_LEDGERS_CHECKED,
help = "the number of ledgers checked by the auditor"
)
private final Counter numLedgersChecked;
@StatsDoc(
name = NUM_FRAGMENTS_PER_LEDGER,
help = "the distribution of number of fragments per ledger"
)
private final OpStatsLogger numFragmentsPerLedger;
@StatsDoc(
name = NUM_BOOKIES_PER_LEDGER,
help = "the distribution of number of bookies per ledger"
)
private final OpStatsLogger numBookiesPerLedger;
@StatsDoc(
name = NUM_BOOKIE_AUDITS_DELAYED,
help = "the number of bookie-audits delayed"
)
private final Counter numBookieAuditsDelayed;
@StatsDoc(
name = NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED,
help = "the number of delayed-bookie-audits cancelled"
)
private final Counter numDelayedBookieAuditsCancelled;
@StatsDoc(
name = NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers not adhering to placement policy found in placement policy check"
)
private final Gauge<Integer> numLedgersNotAdheringToPlacementPolicy;
@StatsDoc(
name = NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
help = "Gauge for number of ledgers softly adhering to placement policy found in placement policy check"
)
private final Gauge<Integer> numLedgersSoftlyAdheringToPlacementPolicy;
@StatsDoc(
name = NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
help = "Gauge for number of underreplicated ledgers elapsed recovery grace period"
)
private final Gauge<Integer> numUnderreplicatedLedgersElapsedRecoveryGracePeriod;
@StatsDoc(
name = NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
help = "Gauge for number of ledgers having an entry with all the replicas missing"
)
private final Gauge<Integer> numLedgersHavingNoReplicaOfAnEntry;
@StatsDoc(
name = NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
help = "Gauge for number of ledgers having an entry with less than AQ number of replicas"
+ ", this doesn't include ledgers counted towards numLedgersHavingNoReplicaOfAnEntry"
)
private final Gauge<Integer> numLedgersHavingLessThanAQReplicasOfAnEntry;
@StatsDoc(
name = NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
help = "Gauge for number of ledgers having an entry with less than WQ number of replicas"
+ ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
)
private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
}
static BookKeeper createBookKeeperClient(ServerConfiguration conf, StatsLogger statsLogger)
throws InterruptedException, IOException {
ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
try {
return BookKeeper.forConfig(clientConfiguration).statsLogger(statsLogger).build();
} catch (BKException e) {
throw new IOException("Failed to create bookkeeper client", e);
}
}
static BookKeeper createBookKeeperClientThrowUnavailableException(ServerConfiguration conf)
throws UnavailableException {
try {
return createBookKeeperClient(conf);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UnavailableException("Failed to create bookkeeper client", e);
} catch (IOException e) {
throw new UnavailableException("Failed to create bookkeeper client", e);
}
}
public Auditor(final String bookieIdentifier,
ServerConfiguration conf,
StatsLogger statsLogger)
throws UnavailableException {
this(
bookieIdentifier,
conf,
createBookKeeperClientThrowUnavailableException(conf),
true,
statsLogger);
}
public Auditor(final String bookieIdentifier,
ServerConfiguration conf,
BookKeeper bkc,
boolean ownBkc,
StatsLogger statsLogger)
throws UnavailableException {
this(bookieIdentifier,
conf,
bkc,
ownBkc,
new BookKeeperAdmin(bkc, statsLogger),
true,
statsLogger);
}
public Auditor(final String bookieIdentifier,
ServerConfiguration conf,
BookKeeper bkc,
boolean ownBkc,
BookKeeperAdmin admin,
boolean ownAdmin,
StatsLogger statsLogger)
throws UnavailableException {
this.conf = conf;
this.underreplicatedLedgerRecoveryGracePeriod = conf.getUnderreplicatedLedgerRecoveryGracePeriod();
this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
this.bookieIdentifier = bookieIdentifier;
this.statsLogger = statsLogger;
this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new AtomicInteger(0);
this.ledgersNotAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new AtomicInteger(0);
this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new AtomicInteger(0);
this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new AtomicInteger(0);
this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new AtomicInteger(0);
this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new AtomicInteger(0);
this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0);
numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
uRLPublishTimeForLostBookies = this.statsLogger
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
placementPolicyCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
replicasCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
auditBookiesTime = this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
numBookiesPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return ledgersNotAdheringToPlacementPolicyGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
numLedgersNotAdheringToPlacementPolicy);
numLedgersSoftlyAdheringToPlacementPolicy = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return ledgersSoftlyAdheringToPlacementPolicyGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
numLedgersSoftlyAdheringToPlacementPolicy);
numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
numLedgersHavingNoReplicaOfAnEntry);
numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
numLedgersHavingLessThanAQReplicasOfAnEntry);
numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}
@Override
public Integer getSample() {
return numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
}
};
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
numLedgersHavingLessThanWQReplicasOfAnEntry);
this.bkc = bkc;
this.ownBkc = ownBkc;
this.admin = admin;
this.ownAdmin = ownAdmin;
initialize(conf, bkc);
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
t.setDaemon(true);
return t;
}
});
}
private void initialize(ServerConfiguration conf, BookKeeper bkc)
throws UnavailableException {
try {
LedgerManagerFactory ledgerManagerFactory = AbstractZkLedgerManagerFactory
.newLedgerManagerFactory(
conf,
bkc.getMetadataClientDriver().getLayoutManager());
ledgerManager = ledgerManagerFactory.newLedgerManager();
this.bookieLedgerIndexer = new BookieLedgerIndexer(ledgerManager);
this.ledgerUnderreplicationManager = ledgerManagerFactory
.newLedgerUnderreplicationManager();
LOG.info("AuthProvider used by the Auditor is {}",
admin.getConf().getClientAuthProviderFactoryClass());
if (this.ledgerUnderreplicationManager
.initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}",
conf.getLostBookieRecoveryDelay());
} else {
LOG.info("Valid lostBookieRecoveryDelay zNode is available, so not creating "
+ "lostBookieRecoveryDelay zNode as part of Auditor initialization ");
}
lostBookieRecoveryDelayBeforeChange = this.ledgerUnderreplicationManager.getLostBookieRecoveryDelay();
} catch (CompatibilityException ce) {
throw new UnavailableException(
"CompatibilityException while initializing Auditor", ce);
} catch (IOException | KeeperException ioe) {
throw new UnavailableException(
"Exception while initializing Auditor", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new UnavailableException(
"Interrupted while initializing Auditor", ie);
}
}
private void submitShutdownTask() {
synchronized (this) {
LOG.info("Executing submitShutdownTask");
if (executor.isShutdown()) {
LOG.info("executor is already shutdown");
return;
}
executor.submit(safeRun(new Runnable() {
@Override
public void run() {
synchronized (Auditor.this) {
LOG.info("Shutting down Auditor's Executor");
executor.shutdown();
}
}
}));
}
}
@VisibleForTesting
synchronized Future<?> submitAuditTask() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void>create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(safeRun(new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
try {
waitIfLedgerReplicationDisabled();
int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();
List<String> availableBookies = getAvailableBookies();
// casting to String, as knownBookies and availableBookies
// contains only String values
// find new bookies(if any) and update the known bookie list
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) {
// the bookie, which went down earlier and had an audit scheduled for,
// has come up. So let us stop tracking it and cancel the audit. Since
// we allow delaying of audit when there is only one failed bookie,
// bookiesToBeAudited should just have 1 element and hence containsAll
// check should be ok
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
bookiesToBeAudited.clear();
}
// find lost bookies(if any)
bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies));
if (bookiesToBeAudited.size() == 0) {
return;
}
knownBookies.removeAll(bookiesToBeAudited);
if (lostBookieRecoveryDelay == 0) {
startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (bookiesToBeAudited.size() > 1) {
// if more than one bookie is down, start the audit immediately;
LOG.info("Multiple bookie failure; not delaying bookie audit. "
+ "Bookies lost now: {}; All lost bookies: {}",
CollectionUtils.subtract(knownBookies, availableBookies),
bookiesToBeAudited);
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (auditTask == null) {
// if there is no scheduled audit, schedule one
auditTask = executor.schedule(safeRun(new Runnable() {
@Override
public void run() {
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}
}), lostBookieRecoveryDelay, TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay,
bookiesToBeAudited);
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
}
}
}));
}
synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void> create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(safeRun(new Runnable() {
int lostBookieRecoveryDelay = -1;
@Override
public void run() {
try {
waitIfLedgerReplicationDisabled();
lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();
// if there is pending auditTask, cancel the task. So that it can be rescheduled
// after new lostBookieRecoveryDelay period
if (auditTask != null) {
LOG.info("lostBookieRecoveryDelay period has been changed so canceling the pending AuditTask");
auditTask.cancel(false);
numDelayedBookieAuditsCancelled.inc();
}
// if lostBookieRecoveryDelay is set to its previous value then consider it as
// signal to trigger the Audit immediately.
if ((lostBookieRecoveryDelay == 0)
|| (lostBookieRecoveryDelay == lostBookieRecoveryDelayBeforeChange)) {
LOG.info(
"lostBookieRecoveryDelay has been set to 0 or reset to its previous value, "
+ "so starting AuditTask. Current lostBookieRecoveryDelay: {}, "
+ "previous lostBookieRecoveryDelay: {}",
lostBookieRecoveryDelay, lostBookieRecoveryDelayBeforeChange);
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
} else if (auditTask != null) {
LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly",
lostBookieRecoveryDelay);
auditTask = executor.schedule(safeRun(new Runnable() {
@Override
public void run() {
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}
}), lostBookieRecoveryDelay, TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while reading from ZK", ue);
} finally {
if (lostBookieRecoveryDelay != -1) {
lostBookieRecoveryDelayBeforeChange = lostBookieRecoveryDelay;
}
}
}
}));
}
public void start() {
LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier);
// on startup watching available bookie and based on the
// available bookies determining the bookie failures.
synchronized (this) {
if (executor.isShutdown()) {
return;
}
try {
watchBookieChanges();
knownBookies = getAvailableBookies();
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, so exiting", bke);
submitShutdownTask();
}
try {
this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
} catch (UnavailableException ue) {
LOG.error("Exception while registering for LostBookieRecoveryDelay change notification, so exiting",
ue);
submitShutdownTask();
}
scheduleBookieCheckTask();
scheduleCheckAllLedgersTask();
schedulePlacementPolicyCheckTask();
scheduleReplicasCheckTask();
}
}
private void scheduleBookieCheckTask() {
long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
if (bookieCheckInterval == 0) {
LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
executor.submit(safeRun(bookieCheck));
} else {
LOG.info("Auditor periodic bookie checking enabled" + " 'auditorPeriodicBookieCheckInterval' {} seconds",
bookieCheckInterval);
executor.scheduleAtFixedRate(safeRun(bookieCheck), 0, bookieCheckInterval, TimeUnit.SECONDS);
}
}
private void scheduleCheckAllLedgersTask(){
long interval = conf.getAuditorPeriodicCheckInterval();
if (interval > 0) {
LOG.info("Auditor periodic ledger checking enabled" + " 'auditorPeriodicCheckInterval' {} seconds",
interval);
long checkAllLedgersLastExecutedCTime;
long durationSinceLastExecutionInSecs;
long initialDelay;
try {
checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
checkAllLedgersLastExecutedCTime = -1;
}
if (checkAllLedgersLastExecutedCTime == -1) {
durationSinceLastExecutionInSecs = -1;
initialDelay = 0;
} else {
durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime)
/ 1000;
if (durationSinceLastExecutionInSecs < 0) {
// this can happen if there is no strict time ordering
durationSinceLastExecutionInSecs = 0;
}
initialDelay = durationSinceLastExecutionInSecs > interval ? 0
: (interval - durationSinceLastExecutionInSecs);
}
LOG.info(
"checkAllLedgers scheduling info. checkAllLedgersLastExecutedCTime: {} "
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
executor.scheduleAtFixedRate(safeRun(new Runnable() {
@Override
public void run() {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping checkAllLedgers");
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting checkAllLedgers");
checkAllLedgers();
long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
} catch (KeeperException ke) {
LOG.error("Exception while running periodic check", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while running periodic check", ie);
} catch (BKException bke) {
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
LOG.error("I/O exception running periodic check", ioe);
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
}
}
}), initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic checking disabled");
}
}
private void schedulePlacementPolicyCheckTask(){
long interval = conf.getAuditorPeriodicPlacementPolicyCheckInterval();
if (interval > 0) {
LOG.info("Auditor periodic placement policy check enabled"
+ " 'auditorPeriodicPlacementPolicyCheckInterval' {} seconds", interval);
long placementPolicyCheckLastExecutedCTime;
long durationSinceLastExecutionInSecs;
long initialDelay;
try {
placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue);
placementPolicyCheckLastExecutedCTime = -1;
}
if (placementPolicyCheckLastExecutedCTime == -1) {
durationSinceLastExecutionInSecs = -1;
initialDelay = 0;
} else {
durationSinceLastExecutionInSecs = (System.currentTimeMillis() - placementPolicyCheckLastExecutedCTime)
/ 1000;
if (durationSinceLastExecutionInSecs < 0) {
// this can happen if there is no strict time ordering
durationSinceLastExecutionInSecs = 0;
}
initialDelay = durationSinceLastExecutionInSecs > interval ? 0
: (interval - durationSinceLastExecutionInSecs);
}
LOG.info(
"placementPolicyCheck scheduling info. placementPolicyCheckLastExecutedCTime: {} "
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
executor.scheduleAtFixedRate(safeRun(new Runnable() {
@Override
public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting PlacementPolicyCheck");
placementPolicyCheck();
long placementPolicyCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
int numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue =
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.get();
int numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue =
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.get();
int numOfClosedLedgersAuditedInPlacementPolicyCheckValue =
numOfClosedLedgersAuditedInPlacementPolicyCheck.get();
int numOfURLedgersElapsedRecoveryGracePeriodValue =
numOfURLedgersElapsedRecoveryGracePeriod.get();
LOG.info(
"Completed placementPolicyCheck in {} milliSeconds."
+ " numOfClosedLedgersAuditedInPlacementPolicyCheck {}"
+ " numOfLedgersNotAdheringToPlacementPolicy {}"
+ " numOfLedgersSoftlyAdheringToPlacementPolicy {}"
+ " numOfURLedgersElapsedRecoveryGracePeriod {}",
placementPolicyCheckDuration, numOfClosedLedgersAuditedInPlacementPolicyCheckValue,
numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue,
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue,
numOfURLedgersElapsedRecoveryGracePeriodValue);
ledgersNotAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue);
ledgersSoftlyAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
numOfURLedgersElapsedRecoveryGracePeriodGuageValue
.set(numOfURLedgersElapsedRecoveryGracePeriodValue);
placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
TimeUnit.MILLISECONDS);
} catch (BKAuditException e) {
int numOfLedgersFoundInPlacementPolicyCheckValue =
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.get();
if (numOfLedgersFoundInPlacementPolicyCheckValue > 0) {
/*
* Though there is BKAuditException while doing
* placementPolicyCheck, it found few ledgers not
* adhering to placement policy. So reporting it.
*/
ledgersNotAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundInPlacementPolicyCheckValue);
}
int numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue =
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.get();
if (numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue > 0) {
/*
* Though there is BKAuditException while doing
* placementPolicyCheck, it found few ledgers softly
* adhering to placement policy. So reporting it.
*/
ledgersSoftlyAdheringToPlacementPolicyGuageValue
.set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
}
int numOfURLedgersElapsedRecoveryGracePeriodValue =
numOfURLedgersElapsedRecoveryGracePeriod.get();
if (numOfURLedgersElapsedRecoveryGracePeriodValue > 0) {
/*
* Though there is BKAuditException while doing
* placementPolicyCheck, it found few urledgers have
* elapsed recovery graceperiod. So reporting it.
*/
numOfURLedgersElapsedRecoveryGracePeriodGuageValue
.set(numOfURLedgersElapsedRecoveryGracePeriodValue);
}
LOG.error(
"BKAuditException running periodic placementPolicy check."
+ "numOfLedgersNotAdheringToPlacementPolicy {}, "
+ "numOfLedgersSoftlyAdheringToPlacementPolicy {},"
+ "numOfURLedgersElapsedRecoveryGracePeriod {}",
numOfLedgersFoundInPlacementPolicyCheckValue,
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue,
numOfURLedgersElapsedRecoveryGracePeriodValue, e);
}
}
}), initialDelay, interval, TimeUnit.SECONDS);
} else {
LOG.info("Periodic placementPolicy check disabled");
}
}
private void scheduleReplicasCheckTask() {
long interval = conf.getAuditorPeriodicReplicasCheckInterval();
if (interval <= 0) {
LOG.info("Periodic replicas check disabled");
return;
}
LOG.info("Auditor periodic replicas check enabled" + " 'auditorReplicasCheckInterval' {} seconds", interval);
long replicasCheckLastExecutedCTime;
long durationSinceLastExecutionInSecs;
long initialDelay;
try {
replicasCheckLastExecutedCTime = ledgerUnderreplicationManager.getReplicasCheckCTime();
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get replicasCheckCTime", ue);
replicasCheckLastExecutedCTime = -1;
}
if (replicasCheckLastExecutedCTime == -1) {
durationSinceLastExecutionInSecs = -1;
initialDelay = 0;
} else {
durationSinceLastExecutionInSecs = (System.currentTimeMillis() - replicasCheckLastExecutedCTime) / 1000;
if (durationSinceLastExecutionInSecs < 0) {
// this can happen if there is no strict time ordering
durationSinceLastExecutionInSecs = 0;
}
initialDelay = durationSinceLastExecutionInSecs > interval ? 0
: (interval - durationSinceLastExecutionInSecs);
}
LOG.info(
"replicasCheck scheduling info. replicasCheckLastExecutedCTime: {} "
+ "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
executor.scheduleAtFixedRate(safeRun(new Runnable() {
@Override
public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.info("Starting ReplicasCheck");
replicasCheck();
long replicasCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
int numLedgersFoundHavingNoReplicaOfAnEntryValue = numLedgersFoundHavingNoReplicaOfAnEntry.get();
int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue =
numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get();
int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue =
numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get();
LOG.info(
"Completed ReplicasCheck in {} milliSeconds numLedgersFoundHavingNoReplicaOfAnEntry {}"
+ " numLedgersFoundHavingLessThanAQReplicasOfAnEntry {}"
+ " numLedgersFoundHavingLessThanWQReplicasOfAnEntry {}.",
replicasCheckDuration, numLedgersFoundHavingNoReplicaOfAnEntryValue,
numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue,
numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue
.set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue
.set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
replicasCheckTime.registerSuccessfulEvent(replicasCheckDuration, TimeUnit.MILLISECONDS);
} catch (BKAuditException e) {
LOG.error("BKAuditException running periodic replicas check.", e);
int numLedgersFoundHavingNoReplicaOfAnEntryValue = numLedgersFoundHavingNoReplicaOfAnEntry.get();
if (numLedgersFoundHavingNoReplicaOfAnEntryValue > 0) {
/*
* Though there is BKAuditException while doing
* replicasCheck, it found few ledgers having no replica
* of an entry. So reporting it.
*/
numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
}
int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue =
numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get();
if (numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue > 0) {
/*
* Though there is BKAuditException while doing
* replicasCheck, it found few ledgers having an entry
* less than AQ num of Replicas. So reporting it.
*/
numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue
.set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
}
int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue =
numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get();
if (numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue > 0) {
/*
* Though there is BKAuditException while doing
* replicasCheck, it found few ledgers having an entry
* less than WQ num of Replicas. So reporting it.
*/
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue
.set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
}
}
}
}), initialDelay, interval, TimeUnit.SECONDS);
}
private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
try {
Auditor.this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
} catch (UnavailableException ae) {
LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae);
}
Auditor.this.submitLostBookieRecoveryDelayChangedEvent();
}
}
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("LedgerReplication is disabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
cb.await();
}
}
private List<String> getAvailableBookies() throws BKException {
// Get the available bookies
Collection<BookieSocketAddress> availableBkAddresses = admin.getAvailableBookies();
Collection<BookieSocketAddress> readOnlyBkAddresses = admin.getReadOnlyBookies();
availableBkAddresses.addAll(readOnlyBkAddresses);
List<String> availableBookies = new ArrayList<String>();
for (BookieSocketAddress addr : availableBkAddresses) {
availableBookies.add(addr.toString());
}
return availableBookies;
}
private void watchBookieChanges() throws BKException {
admin.watchWritableBookiesChanged(bookies -> submitAuditTask());
admin.watchReadOnlyBookiesChanged(bookies -> submitAuditTask());
}
/**
* Start running the actual audit task.
*
* @param shutDownTask
* A boolean that indicates whether or not to schedule shutdown task on any failure
*/
private void startAudit(boolean shutDownTask) {
try {
auditBookies();
shutDownTask = false;
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
}
if (shutDownTask) {
submitShutdownTask();
}
}
@SuppressWarnings("unchecked")
private void auditBookies()
throws BKAuditException, InterruptedException, BKException {
try {
waitIfLedgerReplicationDisabled();
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
LOG.info("Starting auditBookies");
Stopwatch stopwatch = Stopwatch.createStarted();
// put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
// has been disabled while we were generating the index
// discard this run, and schedule a new one
executor.submit(safeRun(bookieCheck));
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
return;
}
List<String> availableBookies = getAvailableBookies();
// find lost bookies
Set<String> knownBookies = ledgerDetails.keySet();
Collection<String> lostBookies = CollectionUtils.subtract(knownBookies,
availableBookies);
bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
if (lostBookies.size() > 0) {
try {
FutureUtils.result(
handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
} catch (ReplicationException e) {
throw new BKAuditException(e.getMessage(), e.getCause());
}
uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
LOG.info("Completed auditBookies");
auditBookiesTime.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
private Map<String, Set<Long>> generateBookie2LedgersIndex()
throws BKAuditException {
return bookieLedgerIndexer.getBookieToLedgerIndex();
}
private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies,
Map<String, Set<Long>> ledgerDetails) {
LOG.info("Following are the failed bookies: {},"
+ " and searching its ledgers for re-replication", lostBookies);
return FutureUtils.processList(
Lists.newArrayList(lostBookies),
bookieIP -> publishSuspectedLedgersAsync(
Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),
null
);
}
private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
if (null == ledgers || ledgers.size() == 0) {
// there is no ledgers available for this bookie and just
// ignoring the bookie failures
LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
return FutureUtils.Void();
}
LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
return FutureUtils.processList(
Lists.newArrayList(ledgers),
ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
null
);
}
/**
* Process the result returned from checking a ledger.
*/
private class ProcessLostFragmentsCb implements GenericCallback<Set<LedgerFragment>> {
final LedgerHandle lh;
final AsyncCallback.VoidCallback callback;
ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
this.lh = lh;
this.callback = callback;
}
@Override
public void operationComplete(int rc, Set<LedgerFragment> fragments) {
if (rc == BKException.Code.OK) {
Set<BookieSocketAddress> bookies = Sets.newHashSet();
for (LedgerFragment f : fragments) {
bookies.addAll(f.getAddresses());
}
if (bookies.isEmpty()) {
// no missing fragments
callback.processResult(Code.OK, null, null);
return;
}
publishSuspectedLedgersAsync(
bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()),
Sets.newHashSet(lh.getId())
).whenComplete((result, cause) -> {
if (null != cause) {
LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
lh.getId(), bookies, cause);
callback.processResult(Code.ReplicationException, null, null);
} else {
callback.processResult(Code.OK, null, null);
}
});
} else {
callback.processResult(rc, null, null);
}
lh.closeAsync().whenComplete((result, cause) -> {
if (null != cause) {
LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage());
}
});
}
}
/**
* List all the ledgers and check them individually. This should not
* be run very often.
*/
void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
final BookKeeper localClient = createBookKeeperClient(conf);
final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient, statsLogger);
try {
final LedgerChecker checker = new LedgerChecker(localClient);
final CompletableFuture<Void> processFuture = new CompletableFuture<>();
Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
FutureUtils.complete(processFuture, null);
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
FutureUtils.complete(processFuture, null);
return;
}
localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
if (Code.OK == rc) {
checker.checkLedger(lh,
// the ledger handle will be closed after checkLedger is done.
new ProcessLostFragmentsCb(lh, callback),
conf.getAuditorLedgerVerificationPercentage());
// we collect the following stats to get a measure of the
// distribution of a single ledger within the bk cluster
// the higher the number of fragments/bookies, the more distributed it is
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
numLedgersChecked.inc();
} else if (Code.NoSuchLedgerExistsOnMetadataServerException == rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
}
callback.processResult(Code.OK, null, null);
} else {
LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc));
callback.processResult(rc, null, null);
}
}, null);
};
ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
(rc, path, ctx) -> {
if (Code.OK == rc) {
FutureUtils.complete(processFuture, null);
} else {
FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
FutureUtils.result(processFuture, BKException.HANDLER);
try {
ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set checkAllLedgersCTime", ue);
}
} finally {
localAdmin.close();
localClient.close();
}
}
void placementPolicyCheck() throws BKAuditException {
final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.set(0);
this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.set(0);
this.numOfClosedLedgersAuditedInPlacementPolicyCheck.set(0);
this.numOfURLedgersElapsedRecoveryGracePeriod.set(0);
if (this.underreplicatedLedgerRecoveryGracePeriod > 0) {
Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
.listLedgersToRereplicate(null);
List<Long> urLedgersElapsedRecoveryGracePeriod = new ArrayList<Long>();
while (underreplicatedLedgersInfo.hasNext()) {
UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgersInfo.next();
long underreplicatedLedgerMarkTimeInMilSecs = underreplicatedLedger.getCtime();
if (underreplicatedLedgerMarkTimeInMilSecs != UnderreplicatedLedger.UNASSIGNED_CTIME) {
long elapsedTimeInSecs =
(System.currentTimeMillis() - underreplicatedLedgerMarkTimeInMilSecs) / 1000;
if (elapsedTimeInSecs > this.underreplicatedLedgerRecoveryGracePeriod) {
urLedgersElapsedRecoveryGracePeriod.add(underreplicatedLedger.getLedgerId());
numOfURLedgersElapsedRecoveryGracePeriod.incrementAndGet();
}
}
}
if (urLedgersElapsedRecoveryGracePeriod.isEmpty()) {
LOG.info("No Underreplicated ledger has elapsed recovery graceperiod: {}",
urLedgersElapsedRecoveryGracePeriod);
} else {
LOG.error("Following Underreplicated ledgers have elapsed recovery graceperiod: {}",
urLedgersElapsedRecoveryGracePeriod);
}
}
Processor<Long> ledgerProcessor = new Processor<Long>() {
@Override
public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadataVer, exception) -> {
if (exception == null) {
LedgerMetadata metadata = metadataVer.getValue();
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
if (metadata.isClosed()) {
boolean foundSegmentNotAdheringToPlacementPolicy = false;
boolean foundSegmentSoftlyAdheringToPlacementPolicy = false;
for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble : metadata
.getAllEnsembles().entrySet()) {
long startEntryIdOfSegment = ensemble.getKey();
List<BookieSocketAddress> ensembleOfSegment = ensemble.getValue();
PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin
.isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
if (segmentAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
foundSegmentNotAdheringToPlacementPolicy = true;
LOG.warn(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+ "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+ "EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
} else if (segmentAdheringToPlacementPolicy == PlacementPolicyAdherence.MEETS_SOFT) {
foundSegmentSoftlyAdheringToPlacementPolicy = true;
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, Segment starting at entry: {}, with ensemble: {}"
+ " having writeQuorumSize: {} and ackQuorumSize: {} is"
+ " softly adhering to EnsemblePlacementPolicy",
ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
ackQuorumSize);
}
}
}
if (foundSegmentNotAdheringToPlacementPolicy) {
numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
} else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.incrementAndGet();
}
numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy"
+ "check analysis for now", ledgerId);
}
}
iterCallback.processResult(BKException.Code.OK, null, null);
} else if (BKException.getExceptionCode(exception)
== BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.debug("Ignoring replication of already deleted ledger {}", ledgerId);
iterCallback.processResult(BKException.Code.OK, null, null);
} else {
LOG.warn("Unable to read the ledger: {} information", ledgerId);
iterCallback.processResult(BKException.getExceptionCode(exception), null, null);
}
});
}
};
// Reading the result after processing all the ledgers
final List<Integer> resultCode = new ArrayList<Integer>(1);
ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object obj) {
resultCode.add(rc);
placementPolicyCheckLatch.countDown();
}
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
placementPolicyCheckLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BKAuditException("Exception while doing placementPolicy check", e);
}
if (!resultCode.contains(BKException.Code.OK)) {
throw new BKAuditException("Exception while doing placementPolicy check",
BKException.create(resultCode.get(0)));
}
try {
ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue);
}
}
private static class MissingEntriesInfo {
// ledger id of missing entries
private final long ledgerId;
/*
* segment details, like start entryid of the segment and ensemble List.
*/
private final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble;
// bookie missing these entries
private final BookieSocketAddress bookieMissingEntries;
/*
* entries of this segment which are supposed to contain in this bookie
* but missing in this bookie.
*/
private final List<Long> unavailableEntriesList;
private MissingEntriesInfo(long ledgerId, Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble,
BookieSocketAddress bookieMissingEntries, List<Long> unavailableEntriesList) {
this.ledgerId = ledgerId;
this.segmentEnsemble = segmentEnsemble;
this.bookieMissingEntries = bookieMissingEntries;
this.unavailableEntriesList = unavailableEntriesList;
}
private long getLedgerId() {
return ledgerId;
}
private Entry<Long, ? extends List<BookieSocketAddress>> getSegmentEnsemble() {
return segmentEnsemble;
}
private BookieSocketAddress getBookieMissingEntries() {
return bookieMissingEntries;
}
private List<Long> getUnavailableEntriesList() {
return unavailableEntriesList;
}
}
private static class MissingEntriesInfoOfLedger {
private final long ledgerId;
private final int ensembleSize;
private final int writeQuorumSize;
private final int ackQuorumSize;
private final List<MissingEntriesInfo> missingEntriesInfoList;
private MissingEntriesInfoOfLedger(long ledgerId, int ensembleSize, int writeQuorumSize, int ackQuorumSize,
List<MissingEntriesInfo> missingEntriesInfoList) {
this.ledgerId = ledgerId;
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.missingEntriesInfoList = missingEntriesInfoList;
}
private long getLedgerId() {
return ledgerId;
}
private int getEnsembleSize() {
return ensembleSize;
}
private int getWriteQuorumSize() {
return writeQuorumSize;
}
private int getAckQuorumSize() {
return ackQuorumSize;
}
private List<MissingEntriesInfo> getMissingEntriesInfoList() {
return missingEntriesInfoList;
}
}
private class ReadLedgerMetadataCallbackForReplicasCheck
implements BiConsumer<Versioned<LedgerMetadata>, Throwable> {
private final long ledgerInRange;
private final MultiCallback mcbForThisLedgerRange;
private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;
ReadLedgerMetadataCallbackForReplicasCheck(long ledgerInRange, MultiCallback mcbForThisLedgerRange,
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries,
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
this.ledgerInRange = ledgerInRange;
this.mcbForThisLedgerRange = mcbForThisLedgerRange;
this.ledgersWithMissingEntries = ledgersWithMissingEntries;
this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
}
@Override
public void accept(Versioned<LedgerMetadata> metadataVer, Throwable exception) {
if (exception != null) {
if (BKException
.getExceptionCode(exception) == BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.debug("Ignoring replicas check of already deleted ledger {}", ledgerInRange);
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
} else {
LOG.warn("Unable to read the ledger: {} information", ledgerInRange, exception);
mcbForThisLedgerRange.processResult(BKException.getExceptionCode(exception), null, null);
return;
}
}
LedgerMetadata metadata = metadataVer.getValue();
if (!metadata.isClosed()) {
LOG.debug("Ledger: {} is not yet closed, so skipping the replicas check analysis for now",
ledgerInRange);
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
}
final long lastEntryId = metadata.getLastEntryId();
if (lastEntryId == -1) {
LOG.debug("Ledger: {} is closed but it doesn't has any entries, so skipping the replicas check",
ledgerInRange);
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
}
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
int ensembleSize = metadata.getEnsembleSize();
RoundRobinDistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(writeQuorumSize,
ackQuorumSize, ensembleSize);
List<Entry<Long, ? extends List<BookieSocketAddress>>> segments = new LinkedList<>(
metadata.getAllEnsembles().entrySet());
/*
* since there are multiple segments, MultiCallback should be
* created for (ensembleSize * segments.size()) calls.
*/
MultiCallback mcbForThisLedger = new MultiCallback(ensembleSize * segments.size(), mcbForThisLedgerRange,
null, BKException.Code.OK, BKException.Code.ReadException);
HashMap<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoMap =
new HashMap<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>>();
for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble = segments.get(segmentNum);
final List<BookieSocketAddress> ensembleOfSegment = segmentEnsemble.getValue();
final long startEntryIdOfSegment = segmentEnsemble.getKey();
final boolean lastSegment = (segmentNum == (segments.size() - 1));
final long lastEntryIdOfSegment = lastSegment ? lastEntryId
: segments.get(segmentNum + 1).getKey() - 1;
/*
* Segment can be empty. If last segment is empty, then
* startEntryIdOfSegment of it will be greater than lastEntryId
* of the ledger. If the segment in middle is empty, then its
* startEntry will be same as startEntry of the following
* segment.
*/
final boolean emptySegment = lastSegment ? (startEntryIdOfSegment > lastEntryId)
: (startEntryIdOfSegment == segments.get(segmentNum + 1).getKey());
for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); bookieIndex++) {
final BookieSocketAddress bookieInEnsemble = ensembleOfSegment.get(bookieIndex);
final BitSet entriesStripedToThisBookie = emptySegment ? EMPTY_BITSET
: distributionSchedule.getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment,
lastEntryIdOfSegment);
if (entriesStripedToThisBookie.cardinality() == 0) {
/*
* if no entry is expected to contain in this bookie,
* then there is no point in making
* getListOfEntriesOfLedger call for this bookie. So
* instead callback with success result.
*/
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, in Segment: {}, no entry is expected to contain in"
+ " this bookie: {}. So skipping getListOfEntriesOfLedger call",
ledgerInRange, segmentEnsemble, bookieInEnsemble);
}
mcbForThisLedger.processResult(BKException.Code.OK, null, null);
continue;
}
List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoMap
.get(bookieInEnsemble);
if (bookieSegmentInfoList == null) {
bookieSegmentInfoList = new ArrayList<BookieExpectedToContainSegmentInfo>();
bookiesSegmentInfoMap.put(bookieInEnsemble, bookieSegmentInfoList);
}
bookieSegmentInfoList.add(new BookieExpectedToContainSegmentInfo(startEntryIdOfSegment,
lastEntryIdOfSegment, segmentEnsemble, entriesStripedToThisBookie));
}
}
for (Entry<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoTuple :
bookiesSegmentInfoMap.entrySet()) {
final BookieSocketAddress bookieInEnsemble = bookiesSegmentInfoTuple.getKey();
final List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoTuple
.getValue();
admin.asyncGetListOfEntriesOfLedger(bookieInEnsemble, ledgerInRange)
.whenComplete(new GetListOfEntriesOfLedgerCallbackForReplicasCheck(ledgerInRange, ensembleSize,
writeQuorumSize, ackQuorumSize, bookieInEnsemble, bookieSegmentInfoList,
ledgersWithMissingEntries, ledgersWithUnavailableBookies, mcbForThisLedger));
}
}
}
private static class BookieExpectedToContainSegmentInfo {
private final long startEntryIdOfSegment;
private final long lastEntryIdOfSegment;
private final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble;
private final BitSet entriesOfSegmentStripedToThisBookie;
private BookieExpectedToContainSegmentInfo(long startEntryIdOfSegment, long lastEntryIdOfSegment,
Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble,
BitSet entriesOfSegmentStripedToThisBookie) {
this.startEntryIdOfSegment = startEntryIdOfSegment;
this.lastEntryIdOfSegment = lastEntryIdOfSegment;
this.segmentEnsemble = segmentEnsemble;
this.entriesOfSegmentStripedToThisBookie = entriesOfSegmentStripedToThisBookie;
}
public long getStartEntryIdOfSegment() {
return startEntryIdOfSegment;
}
public long getLastEntryIdOfSegment() {
return lastEntryIdOfSegment;
}
public Entry<Long, ? extends List<BookieSocketAddress>> getSegmentEnsemble() {
return segmentEnsemble;
}
public BitSet getEntriesOfSegmentStripedToThisBookie() {
return entriesOfSegmentStripedToThisBookie;
}
}
private static class GetListOfEntriesOfLedgerCallbackForReplicasCheck
implements BiConsumer<AvailabilityOfEntriesOfLedger, Throwable> {
private final long ledgerInRange;
private final int ensembleSize;
private final int writeQuorumSize;
private final int ackQuorumSize;
private final BookieSocketAddress bookieInEnsemble;
private final List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList;
private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;
private final MultiCallback mcbForThisLedger;
private GetListOfEntriesOfLedgerCallbackForReplicasCheck(long ledgerInRange, int ensembleSize,
int writeQuorumSize, int ackQuorumSize, BookieSocketAddress bookieInEnsemble,
List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList,
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries,
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies,
MultiCallback mcbForThisLedger) {
this.ledgerInRange = ledgerInRange;
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.bookieInEnsemble = bookieInEnsemble;
this.bookieExpectedToContainSegmentInfoList = bookieExpectedToContainSegmentInfoList;
this.ledgersWithMissingEntries = ledgersWithMissingEntries;
this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
this.mcbForThisLedger = mcbForThisLedger;
}
@Override
public void accept(AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger,
Throwable listOfEntriesException) {
if (listOfEntriesException != null) {
if (BKException
.getExceptionCode(listOfEntriesException) == BKException.Code.NoSuchLedgerExistsException) {
LOG.debug("Got NoSuchLedgerExistsException for ledger: {} from bookie: {}", ledgerInRange,
bookieInEnsemble);
/*
* in the case of NoSuchLedgerExistsException, it should be
* considered as empty AvailabilityOfEntriesOfLedger.
*/
availabilityOfEntriesOfLedger = AvailabilityOfEntriesOfLedger.EMPTY_AVAILABILITYOFENTRIESOFLEDGER;
} else {
LOG.warn("Unable to GetListOfEntriesOfLedger for ledger: {} from: {}", ledgerInRange,
bookieInEnsemble, listOfEntriesException);
MissingEntriesInfoOfLedger unavailableBookiesInfoOfThisLedger = ledgersWithUnavailableBookies
.get(ledgerInRange);
if (unavailableBookiesInfoOfThisLedger == null) {
ledgersWithUnavailableBookies.putIfAbsent(ledgerInRange,
new MissingEntriesInfoOfLedger(ledgerInRange, ensembleSize, writeQuorumSize,
ackQuorumSize,
Collections.synchronizedList(new ArrayList<MissingEntriesInfo>())));
unavailableBookiesInfoOfThisLedger = ledgersWithUnavailableBookies.get(ledgerInRange);
}
List<MissingEntriesInfo> missingEntriesInfoList =
unavailableBookiesInfoOfThisLedger.getMissingEntriesInfoList();
for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo
: bookieExpectedToContainSegmentInfoList) {
missingEntriesInfoList.add(new MissingEntriesInfo(ledgerInRange,
bookieExpectedToContainSegmentInfo.getSegmentEnsemble(), bookieInEnsemble, null));
/*
* though GetListOfEntriesOfLedger has failed with
* exception, mcbForThisLedger should be called back
* with OK response, because we dont consider this as
* fatal error in replicasCheck and dont want
* replicasCheck to exit just because of this issue. So
* instead maintain the state of
* ledgersWithUnavailableBookies, so that replicascheck
* will report these ledgers/bookies appropriately.
*/
mcbForThisLedger.processResult(BKException.Code.OK, null, null);
}
return;
}
}
for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo
: bookieExpectedToContainSegmentInfoList) {
final long startEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getStartEntryIdOfSegment();
final long lastEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getLastEntryIdOfSegment();
final BitSet entriesStripedToThisBookie = bookieExpectedToContainSegmentInfo
.getEntriesOfSegmentStripedToThisBookie();
final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
bookieExpectedToContainSegmentInfo.getSegmentEnsemble();
final List<Long> unavailableEntriesList = availabilityOfEntriesOfLedger
.getUnavailableEntries(startEntryIdOfSegment, lastEntryIdOfSegment, entriesStripedToThisBookie);
if ((unavailableEntriesList != null) && (!unavailableEntriesList.isEmpty())) {
MissingEntriesInfoOfLedger missingEntriesInfoOfThisLedger = ledgersWithMissingEntries
.get(ledgerInRange);
if (missingEntriesInfoOfThisLedger == null) {
ledgersWithMissingEntries.putIfAbsent(ledgerInRange,
new MissingEntriesInfoOfLedger(ledgerInRange, ensembleSize, writeQuorumSize,
ackQuorumSize,
Collections.synchronizedList(new ArrayList<MissingEntriesInfo>())));
missingEntriesInfoOfThisLedger = ledgersWithMissingEntries.get(ledgerInRange);
}
missingEntriesInfoOfThisLedger.getMissingEntriesInfoList().add(new MissingEntriesInfo(ledgerInRange,
segmentEnsemble, bookieInEnsemble, unavailableEntriesList));
}
/*
* here though unavailableEntriesList is not empty,
* mcbForThisLedger should be called back with OK response,
* because we dont consider this as fatal error in replicasCheck
* and dont want replicasCheck to exit just because of this
* issue. So instead maintain the state of
* missingEntriesInfoOfThisLedger, so that replicascheck will
* report these ledgers/bookies/missingentries appropriately.
*/
mcbForThisLedger.processResult(BKException.Code.OK, null, null);
}
}
}
private static class ReplicasCheckFinalCallback implements AsyncCallback.VoidCallback {
final AtomicInteger resultCode;
final CountDownLatch replicasCheckLatch;
private ReplicasCheckFinalCallback(AtomicInteger resultCode, CountDownLatch replicasCheckLatch) {
this.resultCode = resultCode;
this.replicasCheckLatch = replicasCheckLatch;
}
@Override
public void processResult(int rc, String s, Object obj) {
resultCode.set(rc);
replicasCheckLatch.countDown();
}
}
void replicasCheck() throws BKAuditException {
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries =
new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies =
new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(zkOpTimeoutMs);
final Semaphore maxConcurrentSemaphore = new Semaphore(MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS);
while (true) {
LedgerRange ledgerRange = null;
try {
if (ledgerRangeIterator.hasNext()) {
ledgerRange = ledgerRangeIterator.next();
} else {
break;
}
} catch (IOException ioe) {
LOG.error("Got IOException while iterating LedgerRangeIterator", ioe);
throw new BKAuditException("Got IOException while iterating LedgerRangeIterator", ioe);
}
ledgersWithMissingEntries.clear();
ledgersWithUnavailableBookies.clear();
numLedgersFoundHavingNoReplicaOfAnEntry.set(0);
numLedgersFoundHavingLessThanAQReplicasOfAnEntry.set(0);
numLedgersFoundHavingLessThanWQReplicasOfAnEntry.set(0);
Set<Long> ledgersInRange = ledgerRange.getLedgers();
int numOfLedgersInRange = ledgersInRange.size();
// Final result after processing all the ledgers
final AtomicInteger resultCode = new AtomicInteger();
final CountDownLatch replicasCheckLatch = new CountDownLatch(1);
ReplicasCheckFinalCallback finalCB = new ReplicasCheckFinalCallback(resultCode, replicasCheckLatch);
MultiCallback mcbForThisLedgerRange = new MultiCallback(numOfLedgersInRange, finalCB, null,
BKException.Code.OK, BKException.Code.ReadException) {
@Override
public void processResult(int rc, String path, Object ctx) {
try {
super.processResult(rc, path, ctx);
} finally {
maxConcurrentSemaphore.release();
}
}
};
LOG.debug("Number of ledgers in the current LedgerRange : {}", numOfLedgersInRange);
for (Long ledgerInRange : ledgersInRange) {
try {
if (!maxConcurrentSemaphore.tryAcquire(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
LOG.error("Timedout ({} secs) while waiting for acquiring semaphore",
REPLICAS_CHECK_TIMEOUT_IN_SECS);
throw new BKAuditException("Timedout while waiting for acquiring semaphore");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Got InterruptedException while acquiring semaphore for replicascheck", ie);
throw new BKAuditException("Got InterruptedException while acquiring semaphore for replicascheck",
ie);
}
if (checkUnderReplicationForReplicasCheck(ledgerInRange, mcbForThisLedgerRange)) {
/*
* if ledger is marked underreplicated, then ignore this
* ledger for replicascheck.
*/
continue;
}
ledgerManager.readLedgerMetadata(ledgerInRange)
.whenComplete(new ReadLedgerMetadataCallbackForReplicasCheck(ledgerInRange,
mcbForThisLedgerRange, ledgersWithMissingEntries, ledgersWithUnavailableBookies));
}
try {
/*
* if mcbForThisLedgerRange is not calledback within
* REPLICAS_CHECK_TIMEOUT_IN_SECS secs then better give up
* doing replicascheck, since there could be an issue and
* blocking the single threaded auditor executor thread is not
* expected.
*/
if (!replicasCheckLatch.await(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
LOG.error(
"For LedgerRange with num of ledgers : {} it didn't complete replicascheck"
+ " in {} secs, so giving up",
numOfLedgersInRange, REPLICAS_CHECK_TIMEOUT_IN_SECS);
throw new BKAuditException("Got InterruptedException while doing replicascheck");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Got InterruptedException while doing replicascheck", ie);
throw new BKAuditException("Got InterruptedException while doing replicascheck", ie);
}
reportLedgersWithMissingEntries(ledgersWithMissingEntries);
reportLedgersWithUnavailableBookies(ledgersWithUnavailableBookies);
int resultCodeIntValue = resultCode.get();
if (resultCodeIntValue != BKException.Code.OK) {
throw new BKAuditException("Exception while doing replicas check",
BKException.create(resultCodeIntValue));
}
}
try {
ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
}
}
private void reportLedgersWithMissingEntries(
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries) {
StringBuilder errMessage = new StringBuilder();
HashMultiset<Long> missingEntries = HashMultiset.create();
int writeQuorumSize;
int ackQuorumSize;
for (Map.Entry<Long, MissingEntriesInfoOfLedger> missingEntriesInfoOfLedgerEntry : ledgersWithMissingEntries
.entrySet()) {
missingEntries.clear();
errMessage.setLength(0);
long ledgerWithMissingEntries = missingEntriesInfoOfLedgerEntry.getKey();
MissingEntriesInfoOfLedger missingEntriesInfoOfLedger = missingEntriesInfoOfLedgerEntry.getValue();
List<MissingEntriesInfo> missingEntriesInfoList = missingEntriesInfoOfLedger.getMissingEntriesInfoList();
writeQuorumSize = missingEntriesInfoOfLedger.getWriteQuorumSize();
ackQuorumSize = missingEntriesInfoOfLedger.getAckQuorumSize();
errMessage.append("Ledger : " + ledgerWithMissingEntries + " has following missing entries : ");
for (int listInd = 0; listInd < missingEntriesInfoList.size(); listInd++) {
MissingEntriesInfo missingEntriesInfo = missingEntriesInfoList.get(listInd);
List<Long> unavailableEntriesList = missingEntriesInfo.getUnavailableEntriesList();
Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
missingEntriesInfo.getSegmentEnsemble();
missingEntries.addAll(unavailableEntriesList);
errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble "
+ segmentEnsemble.getValue() + ", following entries " + unavailableEntriesList
+ " are missing in bookie: " + missingEntriesInfo.getBookieMissingEntries());
if (listInd < (missingEntriesInfoList.size() - 1)) {
errMessage.append(", ");
}
}
LOG.error(errMessage.toString());
Set<Multiset.Entry<Long>> missingEntriesSet = missingEntries.entrySet();
int maxNumOfMissingReplicas = 0;
long entryWithMaxNumOfMissingReplicas = -1L;
for (Multiset.Entry<Long> missingEntryWithCount : missingEntriesSet) {
if (missingEntryWithCount.getCount() > maxNumOfMissingReplicas) {
maxNumOfMissingReplicas = missingEntryWithCount.getCount();
entryWithMaxNumOfMissingReplicas = missingEntryWithCount.getElement();
}
}
int leastNumOfReplicasOfAnEntry = writeQuorumSize - maxNumOfMissingReplicas;
if (leastNumOfReplicasOfAnEntry == 0) {
numLedgersFoundHavingNoReplicaOfAnEntry.incrementAndGet();
LOG.error("Ledger : {} entryId : {} is missing all replicas", ledgerWithMissingEntries,
entryWithMaxNumOfMissingReplicas);
} else if (leastNumOfReplicasOfAnEntry < ackQuorumSize) {
numLedgersFoundHavingLessThanAQReplicasOfAnEntry.incrementAndGet();
LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than ackQuorum num of replicas : {}",
ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry,
ackQuorumSize);
} else if (leastNumOfReplicasOfAnEntry < writeQuorumSize) {
numLedgersFoundHavingLessThanWQReplicasOfAnEntry.incrementAndGet();
LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than writeQuorum num of replicas : {}",
ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry,
writeQuorumSize);
}
}
}
private void reportLedgersWithUnavailableBookies(
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
StringBuilder errMessage = new StringBuilder();
for (Map.Entry<Long, MissingEntriesInfoOfLedger> ledgerWithUnavailableBookiesInfo :
ledgersWithUnavailableBookies.entrySet()) {
errMessage.setLength(0);
long ledgerWithUnavailableBookies = ledgerWithUnavailableBookiesInfo.getKey();
List<MissingEntriesInfo> missingBookiesInfoList = ledgerWithUnavailableBookiesInfo.getValue()
.getMissingEntriesInfoList();
errMessage.append("Ledger : " + ledgerWithUnavailableBookies + " has following unavailable bookies : ");
for (int listInd = 0; listInd < missingBookiesInfoList.size(); listInd++) {
MissingEntriesInfo missingBookieInfo = missingBookiesInfoList.get(listInd);
Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
missingBookieInfo.getSegmentEnsemble();
errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble "
+ segmentEnsemble.getValue() + ", following bookie has not responded "
+ missingBookieInfo.getBookieMissingEntries());
if (listInd < (missingBookiesInfoList.size() - 1)) {
errMessage.append(", ");
}
}
LOG.error(errMessage.toString());
}
}
boolean checkUnderReplicationForReplicasCheck(long ledgerInRange, VoidCallback mcbForThisLedgerRange) {
try {
if (ledgerUnderreplicationManager.getLedgerUnreplicationInfo(ledgerInRange) == null) {
return false;
}
/*
* this ledger is marked underreplicated, so ignore it for
* replicasCheck.
*/
LOG.debug("Ledger: {} is marked underrreplicated, ignore this ledger for replicasCheck",
ledgerInRange);
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return true;
} catch (UnavailableException une) {
LOG.error("Got exception while trying to check if ledger: {} is underreplicated", ledgerInRange, une);
mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null);
return true;
}
}
/**
* Shutdown the auditor.
*/
public void shutdown() {
LOG.info("Shutting down auditor");
executor.shutdown();
try {
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("Executor not shutting down, interrupting");
executor.shutdownNow();
}
if (ownAdmin) {
admin.close();
}
if (ownBkc) {
bkc.close();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down auditor bookie", ie);
} catch (BKException bke) {
LOG.warn("Exception while shutting down auditor bookie", bke);
}
}
@Override
public void close() {
shutdown();
}
/**
* Return true if auditor is running otherwise return false.
*
* @return auditor status
*/
public boolean isRunning() {
return !executor.isShutdown();
}
private final Runnable bookieCheck = new Runnable() {
@Override
public void run() {
if (auditTask == null) {
startAudit(true);
} else {
// if due to a lost bookie an audit task was scheduled,
// let us not run this periodic bookie check now, if we
// went ahead, we'll report under replication and the user
// wanted to avoid that(with lostBookieRecoveryDelay option)
LOG.info("Audit already scheduled; skipping periodic bookie check");
}
}
};
int getLostBookieRecoveryDelayBeforeChange() {
return lostBookieRecoveryDelayBeforeChange;
}
Future<?> getAuditTask() {
return auditTask;
}
}