blob: 9dd6a83bdb268aa029a2284ccb7b3ab655a12181 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Garbage collector implementation using scan and compare.
*
* <p>
* Garbage collection is processed as below:
* <ul>
* <li> fetch all existing ledgers from zookeeper or metastore according to
* the LedgerManager, called <b>globalActiveLedgers</b>
* <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
* <li> loop over <b>bkActiveLedgers</b> to find those ledgers that are not in
* <b>globalActiveLedgers</b>, do garbage collection on them.
* </ul>
* </p>
*
* <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1331}
*/
public class ScanAndCompareGarbageCollector implements GarbageCollector {
static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
static final int MAX_CONCURRENT_ZK_REQUESTS = 1000;
private final LedgerManager ledgerManager;
private final CompactableLedgerStorage ledgerStorage;
private final ServerConfiguration conf;
private final BookieId selfBookieAddress;
private ZooKeeper zk = null;
private boolean enableGcOverReplicatedLedger;
private final long gcOverReplicatedLedgerIntervalMillis;
private long lastOverReplicatedLedgerGcTimeMillis;
private final String zkServers;
private final String zkLedgersRootPath;
private final boolean verifyMetadataOnGc;
private int activeLedgerCounter;
public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage,
ServerConfiguration conf, StatsLogger statsLogger) throws IOException {
this.ledgerManager = ledgerManager;
this.ledgerStorage = ledgerStorage;
this.conf = conf;
this.selfBookieAddress = BookieImpl.getBookieId(conf);
this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis();
this.lastOverReplicatedLedgerGcTimeMillis = System.currentTimeMillis();
if (gcOverReplicatedLedgerIntervalMillis > 0) {
this.enableGcOverReplicatedLedger = true;
}
this.zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval="
+ gcOverReplicatedLedgerIntervalMillis);
verifyMetadataOnGc = conf.getVerifyMetadataOnGC();
this.activeLedgerCounter = 0;
}
public int getNumActiveLedgers() {
return activeLedgerCounter;
}
@Override
public void gc(GarbageCleaner garbageCleaner) {
if (null == ledgerManager) {
// if ledger manager is null, the bookie is not started to connect to metadata store.
// so skip garbage collection
return;
}
try {
// Get a set of all ledgers on the bookie
NavigableSet<Long> bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0,
Long.MAX_VALUE));
this.activeLedgerCounter = bkActiveLedgers.size();
long curTime = System.currentTimeMillis();
boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime
- lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis);
if (checkOverreplicatedLedgers) {
zk = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(conf.getZkTimeout()).build();
// remove all the overreplicated ledgers from the local bookie
Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner);
if (overReplicatedLedgers.isEmpty()) {
LOG.info("No over-replicated ledgers found.");
} else {
LOG.info("Removed over-replicated ledgers: {}", overReplicatedLedgers);
}
lastOverReplicatedLedgerGcTimeMillis = System.currentTimeMillis();
}
// Iterate over all the ledger on the metadata store
long zkOpTimeoutMs = this.conf.getZkTimeout() * 2;
LedgerRangeIterator ledgerRangeIterator = ledgerManager
.getLedgerRanges(zkOpTimeoutMs);
Set<Long> ledgersInMetadata = null;
long start;
long end = -1;
boolean done = false;
AtomicBoolean isBookieInEnsembles = new AtomicBoolean(false);
Versioned<LedgerMetadata> metadata = null;
while (!done) {
start = end + 1;
if (ledgerRangeIterator.hasNext()) {
LedgerRange lRange = ledgerRangeIterator.next();
ledgersInMetadata = lRange.getLedgers();
end = lRange.end();
} else {
ledgersInMetadata = new TreeSet<>();
end = Long.MAX_VALUE;
done = true;
}
Iterable<Long> subBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true);
if (LOG.isDebugEnabled()) {
LOG.debug("Active in metadata {}, Active in bookie {}", ledgersInMetadata, subBkActiveLedgers);
}
for (Long bkLid : subBkActiveLedgers) {
if (!ledgersInMetadata.contains(bkLid)) {
if (verifyMetadataOnGc) {
isBookieInEnsembles.set(false);
metadata = null;
int rc = BKException.Code.OK;
try {
metadata = result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (BKException | TimeoutException e) {
if (e instanceof BKException) {
rc = ((BKException) e).getCode();
} else {
LOG.warn("Time-out while fetching metadata for Ledger {} : {}.", bkLid,
e.getMessage());
continue;
}
}
// check bookie should be part of ensembles in one
// of the segment else ledger should be deleted from
// local storage
if (metadata != null && metadata.getValue() != null) {
metadata.getValue().getAllEnsembles().forEach((entryId, ensembles) -> {
if (ensembles != null && ensembles.contains(selfBookieAddress)) {
isBookieInEnsembles.set(true);
}
});
if (isBookieInEnsembles.get()) {
continue;
}
} else if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.warn("Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.",
bkLid, rc);
continue;
}
}
garbageCleaner.clean(bkLid);
}
}
}
} catch (Throwable t) {
// ignore exception, collecting garbage next time
LOG.warn("Exception when iterating over the metadata", t);
} finally {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error closing zk session", e);
}
zk = null;
}
}
}
private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner)
throws InterruptedException, KeeperException {
final List<ACL> zkAcls = ZkUtils.getACLs(conf);
final Set<Long> overReplicatedLedgers = Sets.newHashSet();
final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size());
for (final Long ledgerId : bkActiveledgers) {
try {
// check if the ledger is being replicated already by the replication worker
if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) {
latch.countDown();
continue;
}
// we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is
// already being checked for deletion, since that might change the ledger ensemble to include the
// current bookie again and, in that case, we cannot remove the ledger from local storage
ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId,
zkAcls);
semaphore.acquire();
ledgerManager.readLedgerMetadata(ledgerId)
.whenComplete((metadata, exception) -> {
try {
if (exception == null) {
// do not delete a ledger that is not closed, since the ensemble might
// change again and include the current bookie while we are deleting it
if (!metadata.getValue().isClosed()) {
return;
}
SortedMap<Long, ? extends List<BookieId>> ensembles =
metadata.getValue().getAllEnsembles();
for (List<BookieId> ensemble : ensembles.values()) {
// check if this bookie is supposed to have this ledger
if (ensemble.contains(selfBookieAddress)) {
return;
}
}
// this bookie is not supposed to have this ledger,
// thus we can delete this ledger now
overReplicatedLedgers.add(ledgerId);
garbageCleaner.clean(ledgerId);
}
} finally {
semaphore.release();
latch.countDown();
try {
ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(
zk, zkLedgersRootPath, ledgerId);
} catch (Throwable t) {
LOG.error("Exception when removing underreplicated lock for ledger {}",
ledgerId, t);
}
}
});
} catch (Throwable t) {
LOG.error("Exception when iterating through the ledgers to check for over-replication", t);
latch.countDown();
}
}
latch.await();
bkActiveledgers.removeAll(overReplicatedLedgers);
return overReplicatedLedgers;
}
}