blob: 8a3d10f3d95af3afb9af5d034230fbaee6d66712 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob.datastore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import javax.jcr.RepositoryException;
import com.google.common.collect.Sets;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.MultiDataStoreAware;
import org.apache.jackrabbit.core.data.RandomInputStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test base class for {@link DataStore} which covers all scenarios.
* Copied from {@link org.apache.jackrabbit.core.data.TestCaseBase}.
*/
public abstract class AbstractDataStoreTest {
/**
* Logger
*/
protected static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreTest.class);
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
/**
* length of record to be added
*/
protected int dataLength = 123456;
/**
* datastore directory path
*/
protected String dataStoreDir;
protected DataStore ds;
/**
* Random number generator to populate data
*/
protected Random randomGen = new Random();
/**
* Delete temporary directory.
*/
@Before
public void setUp() throws Exception {
dataStoreDir = folder.newFolder().getAbsolutePath();
ds = createDataStore();
}
@After
public void tearDown() {
try {
ds.close();
} catch (DataStoreException e) {
LOG.info("Error in close ds", e);
}
}
/**
* Testcase to validate {@link DataStore#addRecord(InputStream)} API.
*/
@Test
public void testAddRecord() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#addRecord, testDir=" + dataStoreDir);
doAddRecordTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#addRecord finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate {@link DataStore#getRecord(DataIdentifier)} API.
*/
@Test
public void testGetRecord() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testGetRecord, testDir=" + dataStoreDir);
doGetRecordTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testGetRecord finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
}
}
/**
* Testcase to validate {@link DataStore#getAllIdentifiers()} API.
*/
@Test
public void testGetAllIdentifiers() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testGetAllIdentifiers, testDir=" + dataStoreDir);
doGetAllIdentifiersTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testGetAllIdentifiers finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate {@link DataStore#updateModifiedDateOnAccess(long)}
* API.
*/
@Test
public void testUpdateLastModifiedOnAccess() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testUpdateLastModifiedOnAccess, testDir=" + dataStoreDir);
doUpdateLastModifiedOnAccessTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testUpdateLastModifiedOnAccess finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate
* {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.API.
*/
@Test
public void testDeleteRecord() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testDeleteRecord, testDir=" + dataStoreDir);
doDeleteRecordTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testDeleteRecord finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate {@link DataStore#deleteAllOlderThan(long)} API.
*/
@Test
public void testDeleteAllOlderThan() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testDeleteAllOlderThan, testDir=" + dataStoreDir);
doDeleteAllOlderThan();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testDeleteAllOlderThan finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate {@link DataStore#getRecordFromReference(String)}
*/
@Test
public void testReference() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testReference, testDir=" + dataStoreDir);
doReferenceTest();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testReference finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate mixed scenario use of {@link DataStore}.
*/
@Test
public void testSingleThread() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testSingleThread, testDir=" + dataStoreDir);
doTestSingleThread();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testSingleThread finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
/**
* Testcase to validate mixed scenario use of {@link DataStore} in
* multi-threaded concurrent environment.
*/
@Test
public void testMultiThreaded() {
try {
long start = System.currentTimeMillis();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testMultiThreaded, testDir=" + dataStoreDir);
doTestMultiThreaded();
LOG.info("Testcase: " + this.getClass().getName()
+ "#testMultiThreaded finished, time taken = ["
+ (System.currentTimeMillis() - start) + "]ms");
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
protected abstract DataStore createDataStore() throws RepositoryException ;
/**
* Test {@link DataStore#addRecord(InputStream)} and assert length of added
* record.
*/
protected void doAddRecordTest() throws Exception {
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
Assert.assertEquals(data.length, rec.getLength());
assertRecord(data, rec);
}
/**
* Test {@link DataStore#getRecord(DataIdentifier)} and assert length and
* inputstream.
*/
protected void doGetRecordTest() throws Exception {
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
rec = ds.getRecord(rec.getIdentifier());
Assert.assertEquals(data.length, rec.getLength());
assertRecord(data, rec);
}
/**
* Test {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.
*/
protected void doDeleteRecordTest() throws Exception {
Random random = randomGen;
byte[] data1 = new byte[dataLength];
random.nextBytes(data1);
DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data1));
byte[] data2 = new byte[dataLength];
random.nextBytes(data2);
DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data2));
byte[] data3 = new byte[dataLength];
random.nextBytes(data3);
DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data3));
((MultiDataStoreAware)ds).deleteRecord(rec1.getIdentifier());
assertNull("rec1 should be null",
ds.getRecordIfStored(rec1.getIdentifier()));
assertEquals(new ByteArrayInputStream(data2),
ds.getRecord(rec2.getIdentifier()).getStream());
assertEquals(new ByteArrayInputStream(data3),
ds.getRecord(rec3.getIdentifier()).getStream());
}
/**
* Test {@link DataStore#getAllIdentifiers()} and asserts all identifiers
* are returned.
*/
protected void doGetAllIdentifiersTest() throws Exception {
List<DataIdentifier> list = new ArrayList<DataIdentifier>();
Random random = randomGen;
byte[] data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
list.add(rec.getIdentifier());
data = new byte[dataLength];
random.nextBytes(data);
rec = ds.addRecord(new ByteArrayInputStream(data));
list.add(rec.getIdentifier());
data = new byte[dataLength];
random.nextBytes(data);
rec = ds.addRecord(new ByteArrayInputStream(data));
list.add(rec.getIdentifier());
Iterator<DataIdentifier> itr = Sets.newHashSet(ds.getAllIdentifiers()).iterator();
while (itr.hasNext()) {
assertTrue("record found on list", list.remove(itr.next()));
}
Assert.assertEquals(0, list.size());
}
/**
* Asserts that timestamp of all records accessed after
* {@link DataStore#updateModifiedDateOnAccess(long)} invocation.
*/
protected void doUpdateLastModifiedOnAccessTest() throws Exception {
Random random = randomGen;
byte[] data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
LOG.debug("rec2 timestamp=" + rec2.getLastModified());
// sleep for some time to ensure that async upload completes in backend.
sleep(6000);
long updateTime = System.currentTimeMillis();
LOG.debug("updateTime=" + updateTime);
ds.updateModifiedDateOnAccess(updateTime);
// sleep to workaround System.currentTimeMillis granularity.
sleep(3000);
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
rec1 = ds.getRecord(rec1.getIdentifier());
Assert.assertEquals("rec1 touched", true, rec1.getLastModified() > updateTime);
LOG.debug("rec2 timestamp=" + rec2.getLastModified());
Assert.assertEquals("rec2 not touched", true,
rec2.getLastModified() < updateTime);
Assert.assertEquals("rec3 touched", true, rec3.getLastModified() > updateTime);
Assert.assertEquals("rec4 touched", true, rec4.getLastModified() > updateTime);
}
/**
* Asserts that {@link DataStore#deleteAllOlderThan(long)} only deleted
* records older than argument passed.
*/
protected void doDeleteAllOlderThan() throws Exception {
Random random = randomGen;
byte[] data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
// sleep for some time to ensure that async upload completes in backend.
sleep(10000);
long updateTime = System.currentTimeMillis();
ds.updateModifiedDateOnAccess(updateTime);
// sleep to workaround System.currentTimeMillis granularity.
sleep(3000);
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
data = new byte[dataLength];
random.nextBytes(data);
DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
rec1 = ds.getRecord(rec1.getIdentifier());
ds.clearInUse();
Assert.assertEquals("only rec2 should be deleted", 1,
ds.deleteAllOlderThan(updateTime));
assertNull("rec2 should be null",
ds.getRecordIfStored(rec2.getIdentifier()));
Iterator<DataIdentifier> itr = ds.getAllIdentifiers();
List<DataIdentifier> list = new ArrayList<DataIdentifier>();
list.add(rec1.getIdentifier());
list.add(rec3.getIdentifier());
list.add(rec4.getIdentifier());
while (itr.hasNext()) {
assertTrue("record found on list", list.remove(itr.next()));
}
Assert.assertEquals("touched records found", 0, list.size());
Assert.assertEquals("rec1 touched", true, rec1.getLastModified() > updateTime);
Assert.assertEquals("rec3 touched", true, rec3.getLastModified() > updateTime);
Assert.assertEquals("rec4 touched", true, rec4.getLastModified() > updateTime);
}
/**
* Test if record can be accessed via
* {@link DataStore#getRecordFromReference(String)}
*/
protected void doReferenceTest() throws Exception {
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
String reference;
DataRecord record = ds.addRecord(new ByteArrayInputStream(data));
reference = record.getReference();
assertReference(data, reference, ds);
}
/**
* Method to validate mixed scenario use of {@link DataStore}.
*/
protected void doTestSingleThread() throws Exception {
doTestMultiThreaded(ds, 1);
}
/**
* Method to validate mixed scenario use of {@link DataStore} in
* multi-threaded concurrent environment.
*/
protected void doTestMultiThreaded() throws Exception {
doTestMultiThreaded(ds, 4);
}
/**
* Method to assert record with byte array.
*/
protected void assertRecord(byte[] expected, DataRecord record)
throws DataStoreException, IOException {
InputStream stream = record.getStream();
try {
for (int i = 0; i < expected.length; i++) {
Assert.assertEquals(expected[i] & 0xff, stream.read());
}
Assert.assertEquals(-1, stream.read());
} finally {
stream.close();
}
}
/**
* Method to run {@link AbstractDataStoreTest#doTest(DataStore, int)} in multiple
* concurrent threads.
*/
protected void doTestMultiThreaded(final DataStore ds, int threadCount)
throws Exception {
final Exception[] exception = new Exception[1];
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
final int x = i;
Thread t = new Thread() {
public void run() {
try {
doTest(ds, x);
} catch (Exception e) {
exception[0] = e;
}
}
};
threads[i] = t;
t.start();
}
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
if (exception[0] != null) {
throw exception[0];
}
}
/**
* Assert randomly read stream from record.
*/
void doTest(DataStore ds, int offset) throws Exception {
ArrayList<DataRecord> list = new ArrayList<DataRecord>();
HashMap<DataRecord, Integer> map = new HashMap<DataRecord, Integer>();
for (int i = 0; i < 10; i++) {
int size = 100000 - (i * 100);
RandomInputStream in = new RandomInputStream(size + offset, size);
DataRecord rec = ds.addRecord(in);
list.add(rec);
map.put(rec, size);
}
Random random = new Random(1);
for (int i = 0; i < list.size(); i++) {
int pos = random.nextInt(list.size());
DataRecord rec = list.get(pos);
int size = map.get(rec);
rec = ds.getRecord(rec.getIdentifier());
Assert.assertEquals(size, rec.getLength());
RandomInputStream expected = new RandomInputStream(size + offset,
size);
InputStream in = rec.getStream();
// Workaround for race condition that can happen with low cache size relative to the test
// read immediately
byte[] buffer = new byte[1];
in.read(buffer);
in = new SequenceInputStream(new ByteArrayInputStream(buffer), in);
if (random.nextBoolean()) {
in = readInputStreamRandomly(in, random);
}
assertEquals(expected, in);
}
}
InputStream readInputStreamRandomly(InputStream in, Random random)
throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[8000];
while (true) {
if (random.nextBoolean()) {
int x = in.read();
if (x < 0) {
break;
}
out.write(x);
} else {
if (random.nextBoolean()) {
int l = in.read(buffer);
if (l < 0) {
break;
}
out.write(buffer, 0, l);
} else {
int offset = random.nextInt(buffer.length / 2);
int len = random.nextInt(buffer.length / 2);
int l = in.read(buffer, offset, len);
if (l < 0) {
break;
}
out.write(buffer, offset, l);
}
}
}
in.close();
return new ByteArrayInputStream(out.toByteArray());
}
/**
* Assert two inputstream
*/
protected void assertEquals(InputStream a, InputStream b)
throws IOException {
try {
assertTrue("binary not equal",
org.apache.commons.io.IOUtils.contentEquals(a, b));
} finally {
try {
a.close();
} catch (Exception ignore) {
}
try {
b.close();
} catch (Exception ignore) {
}
}
}
/**
* Assert inputstream read from reference.
*/
protected void assertReference(byte[] expected, String reference,
DataStore store) throws Exception {
DataRecord record = store.getRecordFromReference(reference);
assertNotNull(record);
Assert.assertEquals(expected.length, record.getLength());
InputStream stream = record.getStream();
try {
assertTrue("binary not equal",
org.apache.commons.io.IOUtils.contentEquals(
new ByteArrayInputStream(expected), stream));
} finally {
stream.close();
}
}
/**
* Utility method to stop execution for duration time.
*
* @param duration
* time in milli seconds
*/
protected void sleep(long duration) {
long expected = System.currentTimeMillis() + duration;
while (System.currentTimeMillis() < expected) {
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
}
}
}
}