blob: 2dc54a3ebd589e7c4de25930ae1c28e824500e3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.CONSISTENCY_NAME;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.FINISH_FAILURE;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.NAME;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.NUM_BLOBS_DELETED;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.NUM_CANDIDATES;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.START;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.TOTAL_SIZE_DELETED;
import static org.apache.jackrabbit.oak.plugins.blob.OperationsStatsMBean.TYPE;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.randomStream;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
import static org.apache.jackrabbit.oak.stats.StatsOptions.METRICS_ONLY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import ch.qos.logback.classic.Level;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
import org.apache.jackrabbit.oak.api.blob.BlobUpload;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.internal.util.collections.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Generic class for BlobGC tests which uses custom MemoryNodeStore as well as a memory NodeStore.
*/
public class BlobGCTest {
protected static final Logger log = LoggerFactory.getLogger(BlobGCTest.class);
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
protected Whiteboard wb;
protected Closer closer;
protected Cluster cluster;
protected Clock clock;
@Before
public void before() throws Exception {
closer = Closer.create();
clock = getClock();
// add whiteboard
final AtomicReference<Map<?, ?>> props = new AtomicReference<Map<?, ?>>();
wb = new DefaultWhiteboard(){
@Override
public <T> Registration register(Class<T> type, T service, Map<?, ?> properties) {
props.set(properties);
return super.register(type, service, properties);
}
};
TimeLapsedDataStore dataStore = new TimeLapsedDataStore(clock);
DataStoreBlobStore blobStore = new DataStoreBlobStore(dataStore);
MemoryBlobStoreNodeStore nodeStore = new MemoryBlobStoreNodeStore(blobStore);
cluster = new Cluster(folder.newFolder(), blobStore, nodeStore, 0);
closer.register(cluster);
}
@After
public void after() {
try {
closer.close();
} catch (IOException e) {
log.error("Error closing cluster instances", e);
}
}
protected Clock getClock() {
return new Clock.Virtual();
}
class Cluster implements Closeable {
protected final BlobStoreState blobStoreState;
private final File root;
private final Clock clock;
String repoId;
protected final TimeLapsedDataStore dataStore;
protected final GarbageCollectableBlobStore blobStore;
protected final NodeStore nodeStore;
private MarkSweepGarbageCollector collector;
protected BlobReferenceRetriever referenceRetriever;
protected ScheduledExecutorService scheduledExecutor;
protected ThreadPoolExecutor executor;
protected DefaultStatisticsProvider statsProvider;
protected long startReferenceTime;
public Cluster(File root, GarbageCollectableBlobStore blobStore, NodeStore nodeStore, int seed) throws Exception {
this.root = root;
this.nodeStore = nodeStore;
this.dataStore = (TimeLapsedDataStore) ((DataStoreBlobStore) blobStore).getDataStore();
this.blobStore = blobStore;
this.clock = dataStore.getClock();
if (SharedDataStoreUtils.isShared(blobStore)) {
repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
((SharedDataStore) blobStore).setRepositoryId(repoId);
}
referenceRetriever = ((MemoryBlobStoreNodeStore) nodeStore).getBlobReferenceRetriever();
startReferenceTime = clock.getTime();
log.info("Reference time {}", startReferenceTime);
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
statsProvider = new DefaultStatisticsProvider(scheduledExecutor);
blobStoreState = setUp(nodeStore, blobStore, 10, 5, 100, seed);
}
public void setRepoId(String id) {
this.repoId = id;
}
public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs) throws Exception {
return getCollector(blobGcMaxAgeInSecs, false, false);
}
public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs, boolean checkConsistency,
boolean sweepIfRefsPastRetention) throws Exception {
collector =
new MarkSweepGarbageCollector(referenceRetriever, blobStore, executor, root.getAbsolutePath(), 2048,
blobGcMaxAgeInSecs, checkConsistency, sweepIfRefsPastRetention, repoId, wb, statsProvider);
collector.setClock(clock);
return collector;
}
@Override public void close() throws IOException {
new ExecutorCloser(scheduledExecutor).close();
new ExecutorCloser(executor).close();
}
}
@Test
public void sharedGC() throws Exception {
log.info("Staring sharedGC()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
Sets.SetView<String> totalPresent =
Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
Sets.SetView<String> totalAdded =
Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(0), false);
assertTrue(Sets.symmetricDifference(totalPresent, existingAfterGC).isEmpty());
assertStats(secondCluster.statsProvider, 1, 0, totalAdded.size() - totalPresent.size(),
totalAdded.size() - totalPresent.size(), NAME);
}
@Test
public void noSharedGC() throws Exception {
log.info("Starting noSharedGC()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
Sets.SetView<String> totalAdded =
Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(0), false);
assertEquals(totalAdded, existingAfterGC);
assertStats(secondCluster.statsProvider, 1, 1, 0, 0, NAME);
}
@Test
public void sharedGCRepositoryCloned() throws Exception {
log.debug("Starting sharedGCRepoCloned()");
// Setup a different cluster/repository sharing the blob store and the repository id
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
((SharedDataStore) secondCluster.blobStore).deleteMetadataRecord(REPOSITORY.getNameFromId(secondCluster.repoId));
secondCluster.setRepoId(cluster.repoId);
Sets.SetView<String> totalPresent =
Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(0), false);
assertTrue(Sets.symmetricDifference(totalPresent, existingAfterGC).isEmpty());
}
@Test
public void sharedGCRefsOld() throws Exception {
log.info("Staring sharedGCRefsOld()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
Sets.SetView<String> totalPresent =
Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
Sets.SetView<String> totalAdded =
Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
clock.waitUntil(clock.getTime() + 5);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(5), true);
executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
clock.waitUntil(clock.getTime() + 5);
Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(5, false, true), false);
assertTrue(Sets.symmetricDifference(totalPresent, existingAfterGC).isEmpty());
assertStats(secondCluster.statsProvider, 2, 0, totalAdded.size() - totalPresent.size(),
totalAdded.size() - totalPresent.size(), NAME);
}
@Test
public void sharedGCRefsNotOld() throws Exception {
log.info("Staring sharedGCRefsNotOld()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
Sets.SetView<String> totalPresent =
Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
Sets.SetView<String> totalAdded =
Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(5), true);
// Let the second cluster one not pass retention old time
clock.waitUntil(clock.getTime() + 5);
executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(6, false, true), false);
assertTrue(Sets.symmetricDifference(totalAdded, existingAfterGC).isEmpty());
assertStats(secondCluster.statsProvider, 2, 1, 0,0, NAME);
}
@Test
public void gc() throws Exception {
log.info("Starting gc()");
Set<String> existingAfterGC = executeGarbageCollection(cluster, cluster.getCollector(0), false);
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(),
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(), NAME);
}
@Test
public void gcWithConsistencyCheck() throws Exception {
log.info("Starting gcWithConsistencyCheck()");
((MemoryBlobStoreNodeStore) cluster.nodeStore).getReferencedBlobs().add("SPURIOUS");
MarkSweepGarbageCollector collector = cluster.getCollector(0, true, false);
Set<String> existingAfterGC = executeGarbageCollection(cluster, collector, false);
assertFalse(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size() + 1,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size() + 1, NAME);
assertStatsBean(collector.getConsistencyOperationStats(), 1, 1, 1);
}
@Test
public void gcWithNoDeleteDirectBinary() throws Exception {
log.info("Starting gcWithNoDeleteDirectBinary()");
setupDirectBinary(1, 0);
Set<String> existingAfterGC = executeGarbageCollection(cluster, cluster.getCollector(0), false);
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(),
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(), NAME);
}
@Test
public void gcWithDeleteDirectBinary() throws Exception {
log.info("Starting gcWithNoDeleteDirectBinary()");
setupDirectBinary(5, 2);
Set<String> existingAfterGC = executeGarbageCollection(cluster, cluster.getCollector(0), false);
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(),
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(), NAME);
}
@Test
public void noGc() throws Exception {
log.info("Starting noGc()");
long afterSetupTime = clock.getTime();
log.info("after setup time {}", afterSetupTime);
Set<String> existingAfterGC =
executeGarbageCollection(cluster, cluster.getCollector(afterSetupTime - cluster.startReferenceTime + 2),
false);
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsAdded, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(), NAME);
}
@Test
public void checkConsistency() throws Exception {
log.info("Starting checkConsistency()");
long afterSetupTime = clock.getTime();
log.info("after setup time {}", afterSetupTime);
MarkSweepGarbageCollector collector = cluster.getCollector(0);
long missing = collector.checkConsistency();
assertEquals(0, missing);
assertStats(cluster.statsProvider, 1, 0, 0, 0, CONSISTENCY_NAME);
assertStatsBean(collector.getConsistencyOperationStats(), 1, 0, 0);
}
@Test
public void checkConsistencyFailure() throws Exception {
log.info("Starting checkConsistencyFailure()");
long afterSetupTime = clock.getTime();
log.info("after setup time {}", afterSetupTime);
cluster.blobStore
.countDeleteChunks(Lists.newArrayList(Iterators.getLast(cluster.blobStoreState.blobsPresent.iterator())),
0);
MarkSweepGarbageCollector collector = cluster.getCollector(0);
long missing = collector.checkConsistency();
assertEquals(1, missing);
assertStats(cluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
assertStatsBean(collector.getConsistencyOperationStats(), 1, 1, 1);
}
@Test
public void checkConsistencyGlobal() throws Exception {
log.info("Staring checkConsistencyGlobal()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore, true);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
secondCluster.blobStoreState.blobsPresent.add(Iterables.firstOf(cluster.blobStoreState.blobsPresent));
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(0, missing);
assertStats(secondCluster.statsProvider, 1, 0, 0, 0, CONSISTENCY_NAME);
assertStatsBean(globalCollector.getConsistencyOperationStats(), 1, 0, 0);
}
@Test
public void checkConsistencyGlobalFailureOther() throws Exception {
log.info("Staring checkConsistencyGlobalFailureOther()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
cluster.blobStore
.countDeleteChunks(Lists.newArrayList(Iterators.getLast(cluster.blobStoreState.blobsPresent.iterator())),
0);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(1, missing);
assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
assertStatsBean(globalCollector.getConsistencyOperationStats(), 1, 1, 1);
}
@Test
public void checkConsistencyGlobalFailure() throws Exception {
log.info("Staring checkConsistencyGlobalFailureOther()");
// Setup a different cluster/repository sharing the blob store
MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
closer.register(secondCluster);
secondCluster.blobStore
.countDeleteChunks(Lists.newArrayList(Iterators.getLast(secondCluster.blobStoreState.blobsPresent.iterator())),
0);
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(1, missing);
assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
assertStatsBean(globalCollector.getConsistencyOperationStats(), 1, 1, 1);
}
@Test
public void gcCheckDeletedSize() throws Exception {
log.info("Starting gcCheckDeletedSize()");
// Capture logs for the second round of gc
LogCustomizer customLogs = LogCustomizer
.forLogger(MarkSweepGarbageCollector.class.getName())
.enable(Level.INFO)
.filter(Level.INFO)
.contains("Estimated size recovered for")
.create();
customLogs.starting();
Set<String> existingAfterGC =
executeGarbageCollection(cluster, cluster.getCollector(0),false);
assertEquals(1, customLogs.getLogs().size());
long deletedSize = (cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size()) * 100;
assertTrue(customLogs.getLogs().get(0).contains(String.valueOf(deletedSize)));
assertStats(cluster.statsProvider, 1, 0,
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(),
cluster.blobStoreState.blobsAdded.size() - cluster.blobStoreState.blobsPresent.size(), NAME);
assertEquals(deletedSize, getStatCount(cluster.statsProvider, NAME, TOTAL_SIZE_DELETED));
customLogs.finished();
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void gcMarkOnly() throws Exception {
log.info("Starting gcMarkOnly()");
Set<String> existingAfterGC =
executeGarbageCollection(cluster, cluster.getCollector(0),true);
assertTrue(Sets.symmetricDifference(cluster.blobStoreState.blobsAdded, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0, 0, 0, NAME);
}
protected Set<String> executeGarbageCollection(Cluster cluster, MarkSweepGarbageCollector collector, boolean markOnly)
throws Exception {
collector.collectGarbage(markOnly);
assertEquals(0, cluster.executor.getTaskCount());
Set<String> existingAfterGC = iterate(cluster.blobStore);
log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC);
return existingAfterGC;
}
private void assertStats(StatisticsProvider statsProvider, int start, int failure, long deleted, long candidates,
String typeName) {
assertEquals("Start counter mismatch", start, getStatCount(statsProvider, typeName, START));
assertEquals("Finish error mismatch", failure, getStatCount(statsProvider, typeName, FINISH_FAILURE));
assertEquals("Num deleted mismatch", deleted, getStatCount(statsProvider, typeName, NUM_BLOBS_DELETED));
assertEquals("Num candidates mismatch", candidates, getStatCount(statsProvider, typeName, NUM_CANDIDATES));
}
private void assertStatsBean(OperationsStatsMBean mbean, int start, int failure, long deleted) {
assertEquals("Start counter mismatch", start, mbean.getStartCount());
assertEquals("Finish error mismatch", failure, mbean.getFailureCount());
assertEquals("Num deleted mismatch", deleted, mbean.numDeleted());
}
private long getStatCount(StatisticsProvider statsProvider, String typeName, String name) {
return statsProvider.getCounterStats(
TYPE + "." + typeName + "." + name, METRICS_ONLY).getCount();
}
protected Set<String> iterate(GarbageCollectableBlobStore blobStore) throws Exception {
Iterator<String> cur = blobStore.getAllChunkIds(0);
Set<String> existing = Sets.newHashSet();
while (cur.hasNext()) {
existing.add(cur.next());
}
return existing;
}
public BlobStoreState setUp (NodeStore nodeStore,
GarbageCollectableBlobStore blobStore,
int count,
int deletions,
int blobSize,
int seed) throws Exception {
preSetup();
NodeBuilder a = nodeStore.getRoot().builder();
/* Create and delete nodes with blobs stored in DS*/
int maxDeleted = deletions;
int numBlobs = count;
List<Integer> toBeDeleted = Lists.newArrayList();
Random rand = new Random();
for (int i = 0; i < maxDeleted; i++) {
int n = rand.nextInt(numBlobs);
if (!toBeDeleted.contains(n)) {
toBeDeleted.add(n);
}
}
BlobStoreState state = new BlobStoreState();
for (int i = 0; i < numBlobs; i++) {
Blob b = nodeStore.createBlob(
randomStream(Integer.parseInt(String.valueOf(seed) + String.valueOf(i)), blobSize));
Iterator<String> idIter = blobStore.resolveChunks(b.getContentIdentity());
while (idIter.hasNext()) {
String chunk = idIter.next();
state.blobsAdded.add(chunk);
if (!toBeDeleted.contains(i)) {
state.blobsPresent.add(chunk);
}
}
a.child("c" + i).setProperty("x", b);
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Created blobs : {}", state.blobsAdded.size());
for (int id : toBeDeleted) {
delete("c" + id, nodeStore);
}
log.info("Deleted nodes : {}", toBeDeleted.size());
// Sleep a little to make eligible for cleanup
clock.waitUntil(5);
postSetup(nodeStore, state);
log.info("{} blobs added : {}", state.blobsAdded.size(), state.blobsAdded);
log.info("{} blobs remaining : {}", state.blobsPresent.size(), state.blobsPresent);
return state;
}
protected void setupDirectBinary(int numCreate, int numDelete) throws CommitFailedException {
for (int i = 0; i < numCreate; i++) {
BlobUpload blobUpload = ((BlobAccessProvider) cluster.blobStore).initiateBlobUpload(100, 1);
Blob blob = ((BlobAccessProvider) cluster.blobStore).completeBlobUpload(blobUpload.getUploadToken());
cluster.blobStoreState.blobsAdded.add(blob.getContentIdentity());
cluster.blobStoreState.blobsPresent.add(blob.getContentIdentity());
NodeBuilder builder = cluster.nodeStore.getRoot().builder();
builder.child("dbu" + i).setProperty("x", blob);
cluster.nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
PropertyState property = cluster.nodeStore.getRoot().getChildNode("dbu" + i).getProperty("x");
Blob blobReturned = property.getValue(Type.BINARY);
((MemoryBlobStoreNodeStore) cluster.nodeStore).getReferencedBlobs().add(blobReturned.getContentIdentity());
}
for (int i = 0; i < Math.min(numCreate, numDelete); i++) {
PropertyState property = cluster.nodeStore.getRoot().getChildNode("dbu" + i).getProperty("x");
String blobId = property.getValue(Type.BINARY).getContentIdentity();
delete("dbu" + i, cluster.nodeStore);
((MemoryBlobStoreNodeStore) cluster.nodeStore).getReferencedBlobs().remove(blobId);
cluster.blobStoreState.blobsPresent.remove(blobId);
}
}
protected Set<String> createBlobs(GarbageCollectableBlobStore blobStore, int count, int size) throws Exception {
HashSet<String> blobSet = new HashSet<String>();
for (int i = 0; i < count; i++) {
String id = blobStore.writeBlob(randomStream(10 + i, size));
Iterator<String> idIter = blobStore.resolveChunks(id);
while (idIter.hasNext()) {
String chunk = idIter.next();
blobSet.add(chunk);
}
}
log.info("{} Additional created {}", blobSet.size(), blobSet);
return blobSet;
}
void preSetup() {}
protected void postSetup(NodeStore nodeStore, BlobStoreState state) {
((MemoryBlobStoreNodeStore) nodeStore).setReferencedBlobs(state.blobsPresent);
}
protected void delete(String nodeId, NodeStore nodeStore) throws CommitFailedException {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.child(nodeId).remove();
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
/**
* Represents state of the blobs after setup
*/
class BlobStoreState {
Set<String> blobsAdded = Sets.newHashSet();
Set<String> blobsPresent = Sets.newHashSet();
}
}