blob: 14119fe2bd141e83026873a30e7fa2a5692a0ecc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob.datastore;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker.BlobIdStore;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.collect.Sets.symmetricDifference;
import static java.lang.String.valueOf;
import static java.util.UUID.randomUUID;
import static org.apache.commons.io.IOUtils.closeQuietly;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.readStringsAsSet;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNoException;
import static org.junit.Assume.assumeThat;
/**
* Test for BlobIdTracker.BlobIdStore to test addition, retrieval and removal of blob ids.
*/
public class BlobIdTrackerStoreTest {
private static final Logger log = LoggerFactory.getLogger(BlobIdTrackerStoreTest.class);
File root;
SharedDataStore dataStore;
BlobIdTracker tracker;
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
private String repoId;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
} catch (Exception e) {
assumeNoException(e);
}
}
@Before
public void setup() throws Exception {
this.root = folder.newFolder();
if (dataStore == null) {
dataStore = getBlobStore(root);
}
this.repoId = randomUUID().toString();
this.tracker = initTracker();
}
private BlobIdTracker initTracker() throws IOException {
return new BlobIdTracker(root.getAbsolutePath(),
repoId, 5 * 60, dataStore);
}
@After
public void tearDown() throws IOException {
tracker.close();
folder.delete();
}
@Test
public void addSnapshot() throws Exception {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after add snapshot", initAdd, retrieved);
}
@Test
public void addSnapshotRetrieve() throws Exception {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.snapshot();
Set<String> retrieved = retrieveFile(store, folder);
assertEquals("Incorrect elements after add snapshot reading file", initAdd, retrieved);
}
@Test
public void addSnapshotAdd() throws Exception {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.snapshot();
initAdd.addAll(add(store, range(10001, 10003)));
Set<String> retrieved = retrieve(store);
assertTrue("Incorrect elements with add before snapshot",
symmetricDifference(initAdd, retrieved)
.containsAll(newHashSet("10001", "10002", "10003")));
}
@Test
public void addSnapshotAddSnapshot() throws Exception {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.snapshot();
initAdd.addAll(add(store, range(10001, 10003)));
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements with snapshot after add", initAdd, retrieved);
}
@Test
public void addSnapshotRemove() throws Exception {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.snapshot();
remove(store, folder.newFile(), initAdd, range(2, 3));
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after remove", initAdd, retrieved);
}
@Test
public void addRestart() throws IOException {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 100000));
this.tracker = initTracker();
Set<String> retrieved = retrieve(store);
assertTrue("Extra elements retrieved", retrieved.isEmpty());
store = tracker.store;
store.snapshot();
retrieved = retrieve(store);
assertEquals("Incorrect elements after dirty restart", initAdd, retrieved);
}
@Test
public void addCloseRestart() throws IOException {
BlobIdStore store = tracker.store;
Set<String> initAdd = add(store, range(0, 10000));
store.close();
this.tracker = initTracker();
store = tracker.store;
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after safe restart", initAdd, retrieved);
}
@Test
public void addConcurrentSnapshot() throws IOException, InterruptedException {
final BlobIdStore store = tracker.store;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(2);
Thread addThread = addThread(store, start, done);
Thread snapshotThread = snapshotThread(store, start, done);
snapshotThread.start();
addThread.start();
start.countDown();
done.await();
// Do a snapshot to ensure that all the adds if snapshot finished first are collected
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after concurrent snapshot",
newHashSet(range(0, 100000)), retrieved);
}
@Test
public void addSnapshotConcurrentRetrieve() throws IOException, InterruptedException {
final BlobIdStore store = tracker.store;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(2);
Set<String> initAdd = add(store, range(0, 100000));
final Set<String> retrieves = newHashSet();
Thread retrieveThread = retrieveThread(store, retrieves, start, done);
Thread snapshotThread = snapshotThread(store, start, done);
snapshotThread.start();
retrieveThread.start();
start.countDown();
done.await();
if (retrieves.isEmpty()) {
// take a snapshot to ensure that all adds accounted if snapshot finished last
store.snapshot();
retrieves.addAll(retrieve(store));
}
assertEquals("Incorrect elements after concurrent snapshot/retrieve",
initAdd, retrieves);
}
@Test
public void snapshotConcurrentRemove() throws IOException, InterruptedException {
final BlobIdStore store = tracker.store;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(2);
final Set<String> initAdd = add(store, range(0, 100000));
store.snapshot();
Thread removeThread = removeThread(store, folder.newFile(), initAdd, start, done);
Thread snapshotThread = snapshotThread(store, start, done);
removeThread.start();
snapshotThread.start();
// add some more to check that snapshot is successfull
initAdd.addAll(add(store, range(10001, 10003)));
start.countDown();
done.await();
Set<String> retrieves = retrieve(store);
assertEquals("Incorrect elements after concurrent snapshot/remove",
initAdd, retrieves);
}
@Test
public void addBulkAdd() throws IOException {
final BlobIdStore store = tracker.store;
final Set<String> initAdd = add(store, range(0, 4));
// Add new ids from a file
File temp = folder.newFile();
List<String> newAdd = range(5, 9);
initAdd.addAll(newAdd);
writeStrings(newAdd.iterator(), temp, false);
store.addRecords(temp);
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after bulk add from file",
initAdd, retrieved);
newAdd = range(10, 14);
initAdd.addAll(newAdd);
store.addRecords(newAdd.iterator());
store.snapshot();
retrieved = retrieve(store);
assertEquals("Incorrect elements after bulk add from iterator",
initAdd, retrieved);
}
@Test
public void bulkAddConcurrentCompact() throws IOException, InterruptedException {
final BlobIdStore store = tracker.store;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(2);
Thread addThread = addThread(store, true, start, done);
Thread snapshotThread = snapshotThread(store, start, done);
snapshotThread.start();
addThread.start();
start.countDown();
done.await();
// Do a snapshot to ensure that all the adds if snapshot finished first are collected
store.snapshot();
Set<String> retrieved = retrieve(store);
assertEquals("Incorrect elements after concurrent snapshot",
newHashSet(range(0, 100000)), retrieved);
}
private static Thread addThread(
final BlobIdStore store, final CountDownLatch start, final CountDownLatch done) {
return addThread(store, false, start, done);
}
private static Thread addThread(
final BlobIdStore store, final boolean bulk, final CountDownLatch start, final CountDownLatch done) {
return new Thread("AddThread") {
@Override
public void run() {
try {
List<String> adds = range(0, 100000);
start.await();
if (!bulk) {
add(store, adds);
} else {
store.addRecords(adds.iterator());
}
done.countDown();
} catch (IOException e) {
log.info("Exception in add", e);
} catch (InterruptedException e) {
log.info("Interrupted in add", e);
}
}
};
}
private static Thread retrieveThread(
final BlobIdStore store, final Set<String> retrieves, final CountDownLatch start,
final CountDownLatch done) {
return new Thread("RetrieveThread") {
@Override
public void run() {
try {
start.await();
retrieves.addAll(retrieve(store));
done.countDown();
} catch (IOException e) {
log.info("Exception in retrieve", e);
} catch (InterruptedException e) {
log.info("Interrupted in retrieve", e);
}
}
};
}
private static Thread removeThread(final BlobIdStore store, final File temp,
final Set<String> adds, final CountDownLatch start, final CountDownLatch done) {
return new Thread("RemoveThread") {
@Override
public void run() {
try {
start.await();
remove(store, temp, adds, range(1, 3));
done.countDown();
} catch (IOException e) {
log.info("Exception in remove", e);
} catch (InterruptedException e) {
log.info("Interrupted in remove", e);
}
}
};
}
private static Thread snapshotThread(
final BlobIdStore store, final CountDownLatch start, final CountDownLatch done) {
return new Thread("SnapshotThread") {
@Override
public void run() {
try {
start.await();
store.snapshot();
done.countDown();
} catch (IOException e) {
log.info("Exception in snapshot", e);
} catch (InterruptedException e) {
log.info("Interrupted in snapshot", e);
}
}
};
}
private static Set<String> add(BlobIdStore store, List<String> ints) throws IOException {
Set<String> s = newHashSet();
for (String rec : ints) {
store.addRecord(rec);
s.add(rec);
}
return s;
}
private static Set<String> retrieve(BlobIdStore store) throws IOException {
Set<String> retrieved = newHashSet();
Iterator<String> iter = store.getRecords();
while(iter.hasNext()) {
retrieved.add(iter.next());
}
closeQuietly((Closeable)iter);
return retrieved;
}
private static Set<String> retrieveFile(BlobIdStore store, TemporaryFolder folder) throws IOException {
File f = folder.newFile();
Set<String> retrieved = readStringsAsSet(
new FileInputStream(store.getRecords(f.getAbsolutePath())), false);
return retrieved;
}
private static void remove(BlobIdStore store, File temp, Set<String> initAdd,
List<String> ints) throws IOException {
writeStrings(ints.iterator(), temp, false);
initAdd.removeAll(ints);
store.removeRecords(temp);
}
private static List<String> range(int min, int max) {
List<String> list = newArrayList();
for (int i = min; i <= max; i++) {
list.add(valueOf(i));
}
return list;
}
}