blob: d5866a5020c425db74a6a6c3d6c3d6c7d956826d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.meta;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.GarbageCollector;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.versioning.Version;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test garbage collection ledgers in ledger manager
*/
public class GcLedgersTest extends LedgerManagerTestCase {
static final Logger LOG = LoggerFactory.getLogger(GcLedgersTest.class);
public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
super(lmFactoryCls);
}
/**
* Create ledgers
*/
private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
final AtomicInteger expected = new AtomicInteger(numLedgers);
for (int i=0; i<numLedgers; i++) {
getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
@Override
public void operationComplete(int rc, final Long ledgerId) {
if (BKException.Code.OK != rc) {
synchronized (expected) {
int num = expected.decrementAndGet();
if (num == 0) {
expected.notify();
}
}
return;
}
getLedgerManager().createLedgerMetadata(ledgerId,
new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
if (rc == BKException.Code.OK) {
activeLedgers.put(ledgerId, true);
createdLedgers.add(ledgerId);
}
synchronized (expected) {
int num = expected.decrementAndGet();
if (num == 0) {
expected.notify();
}
}
}
});
}
});
}
synchronized (expected) {
try {
while (expected.get() > 0) {
expected.wait(100);
}
} catch (InterruptedException ie) {
}
}
}
private void removeLedger(long ledgerId) throws Exception {
final AtomicInteger rc = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY,
new GenericCallback<Void>() {
@Override
public void operationComplete(int rc2, Void result) {
rc.set(rc2);
latch.countDown();
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Remove should have succeeded", 0, rc.get());
}
@Test(timeout=60000)
public void testGarbageCollectLedgers() throws Exception {
int numLedgers = 100;
int numRemovedLedgers = 10;
final Set<Long> createdLedgers = new HashSet<Long>();
final Set<Long> removedLedgers = new HashSet<Long>();
// create 100 ledgers
createLedgers(numLedgers, createdLedgers);
Random r = new Random(System.currentTimeMillis());
final List<Long> tmpList = new ArrayList<Long>();
tmpList.addAll(createdLedgers);
Collections.shuffle(tmpList, r);
// random remove several ledgers
for (int i=0; i<numRemovedLedgers; i++) {
long ledgerId = tmpList.get(i);
synchronized (removedLedgers) {
getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY,
new GenericCallback<Void>() {
@Override
public void operationComplete(int rc, Void result) {
synchronized (removedLedgers) {
removedLedgers.notify();
}
}
});
removedLedgers.wait();
}
removedLedgers.add(ledgerId);
createdLedgers.remove(ledgerId);
}
final CountDownLatch inGcProgress = new CountDownLatch(1);
final CountDownLatch createLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(2);
final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
mockLedgerStorage, baseConf);
Thread gcThread = new Thread() {
@Override
public void run() {
garbageCollector.gc(new GarbageCollector.GarbageCleaner() {
boolean paused = false;
@Override
public void clean(long ledgerId) {
try {
mockLedgerStorage.deleteLedger(ledgerId);
} catch (IOException e) {
e.printStackTrace();
return;
}
if (!paused) {
inGcProgress.countDown();
try {
createLatch.await();
} catch (InterruptedException ie) {
}
paused = true;
}
LOG.info("Garbage Collected ledger {}", ledgerId);
}
});
LOG.info("Gc Thread quits.");
endLatch.countDown();
}
};
Thread createThread = new Thread() {
@Override
public void run() {
try {
inGcProgress.await();
// create 10 more ledgers
createLedgers(10, createdLedgers);
LOG.info("Finished creating 10 more ledgers.");
createLatch.countDown();
} catch (Exception e) {
}
LOG.info("Create Thread quits.");
endLatch.countDown();
}
};
createThread.start();
gcThread.start();
endLatch.await();
// test ledgers
for (Long ledger : removedLedgers) {
assertFalse(activeLedgers.containsKey(ledger));
}
for (Long ledger : createdLedgers) {
assertTrue(activeLedgers.containsKey(ledger));
}
}
@Test(timeout=60000)
public void testGcLedgersOutsideRange() throws Exception {
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final Queue<Long> cleaned = new LinkedList<Long>();
int numLedgers = 100;
createLedgers(numLedgers, createdLedgers);
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
new MockLedgerStorage(), baseConf);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
LOG.info("Cleaned {}", ledgerId);
cleaned.add(ledgerId);
}
};
garbageCollector.gc(cleaner);
assertNull("Should have cleaned nothing", cleaned.poll());
long last = createdLedgers.last();
removeLedger(last);
garbageCollector.gc(cleaner);
assertNotNull("Should have cleaned something", cleaned.peek());
assertEquals("Should have cleaned last ledger" + last, (long)last, (long)cleaned.poll());
long first = createdLedgers.first();
removeLedger(first);
garbageCollector.gc(cleaner);
assertNotNull("Should have cleaned something", cleaned.peek());
assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.poll());
}
@Test(timeout=60000)
public void testGcLedgersNotLast() throws Exception {
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final List<Long> cleaned = new ArrayList<Long>();
// Create enough ledgers to span over 4 ranges in the hierarchical ledger manager implementation
final int numLedgers = 30001;
createLedgers(numLedgers, createdLedgers);
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
new MockLedgerStorage(), baseConf);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
LOG.info("Cleaned {}", ledgerId);
cleaned.add(ledgerId);
}
};
SortedSet<Long> scannedLedgers = new TreeSet<Long>();
LedgerRangeIterator iterator = getLedgerManager().getLedgerRanges();
while (iterator.hasNext()) {
LedgerRange ledgerRange = iterator.next();
scannedLedgers.addAll(ledgerRange.getLedgers());
}
assertEquals(createdLedgers, scannedLedgers);
garbageCollector.gc(cleaner);
assertTrue("Should have cleaned nothing", cleaned.isEmpty());
long first = createdLedgers.first();
removeLedger(first);
garbageCollector.gc(cleaner);
assertEquals("Should have cleaned something", 1, cleaned.size());
assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0));
}
}