blob: cd9d00f243fd4b15da6c8a7d6e129ef14da2fcba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob.datastore;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.io.Closeables.close;
import static java.lang.String.valueOf;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.writeStrings;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.getBlobStore;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNoException;
import static org.junit.Assume.assumeThat;
/**
* Test for BlobIdTracker simulating a cluster and a shared data store scenarios
* to test addition, retrieval and removal of blob ids.
*/
public class BlobIdTrackerClusterSharedTest {
private static final Logger log = LoggerFactory.getLogger(BlobIdTrackerClusterSharedTest.class);
File root;
Cluster cluster1;
Cluster cluster2;
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
assumeThat(getBlobStore(), instanceOf(SharedDataStore.class));
} catch (Exception e) {
assumeNoException(e);
}
}
@Before
public void setup() throws Exception {
this.root = folder.newFolder();
}
/**
* Test simulating add, remove, retrieve scenarios for blobId tracker on a 2 node cluster.
*
* @throws IOException
*/
@Test
public void addRetrieveCluster() throws Exception {
String clusterRepoId = randomUUID().toString();
cluster1 = new Cluster(clusterRepoId,
folder.newFolder("cluster1").getAbsolutePath(), folder);
cluster2 = new Cluster(clusterRepoId,
folder.newFolder("cluster2").getAbsolutePath(), folder);
Set<String> adds = newHashSet();
// Add some on cluster 2 & simulate snapshot
adds.addAll(cluster2.doAdd(range(5, 9)));
cluster2.forceSnapshot();
log.info("Done force snapshot for cluster2");
// Add some on cluster1 & simulate snapshot
adds.addAll(cluster1.doAdd(range(0, 4)));
cluster1.forceSnapshot();
log.info("Done force snapshot for cluster1");
// Get on cluster 1
Set<String> retrieves = cluster1.doRetrieve();
assertEquals("Retrieves not correct", adds, retrieves);
log.info("Done retrieve on cluster1");
cluster1.doRemove(adds, range(4, 5));
log.info("Done remove on cluster1");
retrieves = cluster1.doRetrieve();
log.info("Done retrieve on cluster1 again");
assertEquals("Retrieves not correct after remove", adds, retrieves);
}
/**
* Test simulating add, remove, retrieve scenarios for blobId tracker on a 2 node shared
* repository.
*
* @throws IOException
*/
@Test
public void addRetrieveShared() throws Exception {
cluster1 = new Cluster(randomUUID().toString(),
folder.newFolder("cluster1").getAbsolutePath(), folder);
cluster2 = new Cluster(randomUUID().toString(),
folder.newFolder("cluster2").getAbsolutePath(), folder);
Set<String> adds = newHashSet();
// Add some on cluster1 & simulate snapshot
adds.addAll(cluster1.doAdd(range(0, 4)));
cluster1.forceSnapshot();
log.info("Done force snapshot for cluster1");
// Add some on cluster 2 & simulate shapshot
adds.addAll(cluster2.doAdd(range(5, 9)));
cluster2.forceSnapshot();
log.info("Done force snapshot for cluster2");
// Get on cluster 1
Set<String> retrieves = cluster1.doRetrieve();
assertEquals("Retrieves not correct",
adds, retrieves);
log.info("Done retrieve on cluster1");
cluster1.doRemove(adds, range(4, 5));
log.info("Done remove on cluster1");
retrieves = cluster1.doRetrieve();
log.info("Done retrieve on cluster1 again");
assertEquals("Retrieves not correct after remove",
adds, retrieves);
}
/**
* Logical instance.
*/
class Cluster {
ScheduledExecutorService scheduler;
BlobIdTracker tracker;
TemporaryFolder folder;
SharedDataStore dataStore;
Cluster(String repoId, String path, TemporaryFolder folder) throws Exception {
this.dataStore = getBlobStore(root);
this.tracker = new BlobIdTracker(path, repoId, 100 * 60, dataStore);
this.scheduler = newSingleThreadScheduledExecutor();
this.folder = folder;
}
Set<String> doAdd(List<String> ints) throws IOException {
return add(tracker, ints);
}
void doRemove(Set<String> adds, List<String> removes) throws IOException {
remove(tracker, folder.newFile(), adds, removes);
}
void forceSnapshot() {
try {
ScheduledFuture<?> scheduledFuture =
scheduler.schedule(tracker.new SnapshotJob(), 0, MILLISECONDS);
scheduledFuture.get();
} catch (Exception e) {
log.error("Error in snapshot", e);
}
}
Set<String> doRetrieve() throws IOException {
return retrieve(tracker);
}
public void close() throws IOException {
new ExecutorCloser(scheduler).close();
tracker.close();
try {
((DataStoreBlobStore) dataStore).close();
} catch (DataStoreException e) {
log.warn("Error closing blobstore", e);
}
}
}
@After
public void tearDown() throws IOException {
cluster1.close();
cluster2.close();
folder.delete();
}
private static Set<String> add(BlobTracker tracker, List<String> ints) throws IOException {
Set<String> s = newHashSet();
for (String rec : ints) {
tracker.add(rec);
s.add(rec);
}
return s;
}
private static Set<String> retrieve(BlobTracker tracker) throws IOException {
Set<String> retrieved = newHashSet();
Iterator<String> iter = tracker.get();
log.info("retrieving blob ids");
while(iter.hasNext()) {
retrieved.add(iter.next());
}
if (iter instanceof Closeable) {
close((Closeable)iter, true);
}
return retrieved;
}
private static void remove(BlobIdTracker tracker, File temp, Set<String> initAdd,
List<String> ints) throws IOException {
writeStrings(ints.iterator(), temp, false);
initAdd.removeAll(ints);
tracker.remove(temp);
}
private static List<String> range(int min, int max) {
List<String> list = newArrayList();
for (int i = min; i <= max; i++) {
list.add(valueOf(i));
}
return list;
}
}