blob: 03ba72bf073bb6c4706de591174fac2c5f3d41ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.codec.binary.Hex.encodeHexString;
import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsolidatedDataStoreStatsTest extends AbstractDataStoreCacheTest {
private static final Logger LOG = LoggerFactory.getLogger(ConsolidatedDataStoreStatsTest.class);
private static final String ID_PREFIX = "12345";
private static String testNodePathName = "test/node/path/name";
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
@Rule
public ExpectedException expectedEx = ExpectedException.none();
private final Closer closer = Closer.create();
private File root;
private File testFile;
private CountDownLatch taskLatch;
private CountDownLatch callbackLatch;
private CountDownLatch afterExecuteLatch;
private TestExecutor executor;
private StatisticsProvider statsProvider;
private ScheduledExecutorService scheduledExecutor;
private ConsolidatedDataStoreCacheStats stats;
private NodeStore nodeStore;
private AbstractSharedCachingDataStore dataStore;
private static Blob mockBlob;
@Before
public void setup() throws Exception {
root = folder.newFolder();
init(1);
}
private void init(int i) throws Exception {
testFile = folder.newFile();
copyInputStreamToFile(randomStream(0, 16384), testFile);
String testNodeId = getIdForInputStream(new FileInputStream(testFile));
mockBlob = mock(Blob.class);
when(mockBlob.getContentIdentity()).thenReturn(testNodeId);
nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.<List<Blob>>absent());
// create executor
taskLatch = new CountDownLatch(1);
callbackLatch = new CountDownLatch(1);
afterExecuteLatch = new CountDownLatch(i);
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
// stats
ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
statsProvider = new DefaultStatisticsProvider(statsExecutor);
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
final File datastoreRoot = folder.newFolder();
dataStore = new AbstractSharedCachingDataStore() {
@Override protected AbstractSharedBackend createBackend() {
return new TestMemoryBackend(datastoreRoot);
}
@Override public int getMinRecordLength() {
return 0;
}
};
dataStore.setStatisticsProvider(statsProvider);
dataStore.listeningExecutor = executor;
dataStore.schedulerExecutor = scheduledExecutor;
dataStore.init(root.getAbsolutePath());
stats = new ConsolidatedDataStoreCacheStats();
stats.nodeStore = nodeStore;
stats.cachingDataStore = dataStore;
}
@After
public void tear() throws Exception {
closer.close();
dataStore.close();
}
private String getIdForInputStream(final InputStream in)
throws Exception {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
try {
IOUtils.copyLarge(in, output);
} finally {
IOUtils.closeQuietly(output);
IOUtils.closeQuietly(in);
}
return encodeHexString(digest.digest());
}
private static NodeStore initNodeStore(final Optional<Blob> blobProp1,
final Optional<Blob> blobProp2,
final Optional<String> stringProp,
final Optional<Integer> intProp,
final Optional<List<Blob>> blobPropList)
throws CommitFailedException {
final NodeStore nodeStore = new MemoryNodeStore();
NodeBuilder rootBuilder = nodeStore.getRoot().builder();
NodeBuilder builder = initNodeBuilder(rootBuilder);
if (blobProp1.isPresent()) {
builder.setProperty("blobProp1", blobProp1.get());
}
if (blobProp2.isPresent()) {
builder.setProperty("blobProp2", blobProp2.get());
}
if (stringProp.isPresent()) {
builder.setProperty("stringProp", stringProp.get());
}
if (intProp.isPresent()) {
builder.setProperty("intProp", intProp.get());
}
if (blobPropList.isPresent()) {
builder.setProperty(MultiBinaryPropertyState
.binaryPropertyFromBlob("blobPropList", blobPropList.get()));
}
nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return nodeStore;
}
private static NodeBuilder initNodeBuilder(final NodeBuilder rootBuilder) {
NodeBuilder builder = rootBuilder;
for (final String nodeName : PathUtils.elements(testNodePathName)) {
builder = builder.child(nodeName);
}
return builder;
}
@Test
public void noPath()
throws Exception {
assertFalse(stats.isFileSynced(testNodePathName));
}
@Test
public void nullString()
throws Exception {
assertFalse(stats.isFileSynced(null));
}
@Test
public void emptyString()
throws Exception {
assertFalse(stats.isFileSynced(""));
}
@Test
public void differentPaths() throws Exception {
init(4);
final NodeStore nodeStore = new MemoryNodeStore();
stats.nodeStore = nodeStore;
final String path1 = "path/to/node/1";
final String path2 = "path/to/node/2";
final String path3 = "shortpath";
final String path4 = "a/very/very/long/path/leads/to/node/4";
final List<String> paths = Lists.newArrayList(path1, path2, path3, path4);
final String leadingSlashPath = "/" + path1;
final List<String> blobContents = Lists.newArrayList("1", "2", "3", "4");
final List<Blob> blobs = Lists.newArrayList(
mock(Blob.class),
mock(Blob.class),
mock(Blob.class),
mock(Blob.class)
);
final List<String> blobIds = Lists.newArrayList(
getIdForInputStream(getStream(blobContents.get(0))),
getIdForInputStream(getStream(blobContents.get(1))),
getIdForInputStream(getStream(blobContents.get(2))),
getIdForInputStream(getStream(blobContents.get(3)))
);
when(blobs.get(0).getContentIdentity()).thenReturn(blobIds.get(0));
when(blobs.get(1).getContentIdentity()).thenReturn(blobIds.get(1));
when(blobs.get(2).getContentIdentity()).thenReturn(blobIds.get(2));
when(blobs.get(3).getContentIdentity()).thenReturn(blobIds.get(3));
final NodeBuilder rootBuilder = nodeStore.getRoot().builder();
final List<NodeBuilder> builders = Lists.newArrayList();
for (final String path : paths) {
NodeBuilder builder = rootBuilder;
for (final String nodeName : PathUtils.elements(path)) {
builder = builder.child(nodeName);
}
builders.add(builder);
}
builders.get(0).setProperty("blob1", blobs.get(0));
builders.get(1).setProperty("blob2", blobs.get(1));
builders.get(2).setProperty("blob3", blobs.get(2));
builders.get(3).setProperty("blob4", blobs.get(3));
nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final List<DataRecord> records = Lists.newArrayList();
try {
for (final String s : blobContents) {
records.add(dataStore.addRecord(getStream(s)));
}
taskLatch.countDown();
callbackLatch.countDown();
waitFinish();
for (final String path : Lists
.newArrayList(path1, path2, path3, path4, leadingSlashPath)) {
assertTrue(stats.isFileSynced(path));
}
for (final String invalidPath : Lists
.newArrayList(path1 + "/", "/" + path1 + "/", "/path//to/node///1")) {
try {
stats.isFileSynced(invalidPath);
assertFalse(false); // shouldn't get here on an invalid path
} catch (AssertionError e) {
// expected
}
}
}
finally {
delete(dataStore, records);
}
}
@Test
public void multiplePropertiesAndBinarySynced() throws Exception {
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.<Blob>absent(),
Optional.of("abc"),
Optional.of(123),
Optional.<List<Blob>>absent());
assertSyncedTrue(stats, dataStore, new FileInputStream(testFile));
}
@Test
public void multipleBinaryPropsAllSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.of(mockBlob2),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.<List<Blob>>absent());
assertSyncedTrue(stats, dataStore, new FileInputStream(testFile),
getStream("testContents2"));
}
@Test
public void multipleBinaryPropsNotAllSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.of(mockBlob2),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.<List<Blob>>absent());
assertSyncedFalse(stats, dataStore, new FileInputStream(testFile));
}
@Test
public void binaryPropSingle() throws Exception {
List<Blob> blobPropList = Lists.newArrayList(mockBlob);
NodeStore nodeStore = initNodeStore(Optional.<Blob>absent(),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedTrue(stats, dataStore, new FileInputStream(testFile));
}
@Test
public void binariesPropertyMultiple() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
List<Blob> blobPropList = Lists.newArrayList(mockBlob, mockBlob2);
NodeStore nodeStore = initNodeStore(Optional.<Blob>absent(),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedTrue(stats, dataStore, new FileInputStream(testFile),
getStream("testContents2"));
}
@Test
public void binariesPropertyNotAllSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
List<Blob> blobPropList = Lists.newArrayList(mockBlob, mockBlob2);
NodeStore nodeStore = initNodeStore(Optional.<Blob>absent(),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedFalse(stats, dataStore, new FileInputStream(testFile));
}
@Test
public void binarySyncedAndBinariesNotSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
Blob mockBlob3 = mock(Blob.class);
final String id3 = getIdForInputStream(getStream("testContents3"));
when(mockBlob2.getContentIdentity()).thenReturn(id3);
List<Blob> blobPropList = Lists.newArrayList(mockBlob2, mockBlob3);
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedFalse(stats, dataStore, new FileInputStream(testFile),
getStream("testContents2"));
}
@Test
public void binaryNotSyncedAndBinariesSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
Blob mockBlob3 = mock(Blob.class);
final String id3 = getIdForInputStream(getStream("testContents3"));
when(mockBlob2.getContentIdentity()).thenReturn(id3);
List<Blob> blobPropList = Lists.newArrayList(mockBlob2, mockBlob3);
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedFalse(stats, dataStore, getStream("testContents2"),
getStream("testContents3"));
}
@Test
public void binaryAndBinariesSynced() throws Exception {
Blob mockBlob2 = mock(Blob.class);
final String id2 = getIdForInputStream(getStream("testContents2"));
when(mockBlob2.getContentIdentity()).thenReturn(id2);
Blob mockBlob3 = mock(Blob.class);
final String id3 = getIdForInputStream(getStream("testContents3"));
when(mockBlob3.getContentIdentity()).thenReturn(id3);
List<Blob> blobPropList = Lists.newArrayList(mockBlob2, mockBlob3);
NodeStore nodeStore = initNodeStore(Optional.of(mockBlob),
Optional.<Blob>absent(),
Optional.<String>absent(),
Optional.<Integer>absent(),
Optional.of(blobPropList));
assertSyncedFalse(stats, dataStore, new FileInputStream(testFile),
getStream("testContents2"), getStream("testContents3"));
}
private static void delete(AbstractSharedCachingDataStore s3ds, List<DataRecord> recs)
throws DataStoreException {
for (DataRecord rec : recs) {
if (null != rec) {
s3ds.deleteRecord(rec.getIdentifier());
}
}
}
private void assertSyncedFalse(ConsolidatedDataStoreCacheStats mBean,
AbstractSharedCachingDataStore s3ds, InputStream... streams) throws DataStoreException {
List<DataRecord> recs = Lists.newArrayList();
try {
for (InputStream is : streams) {
recs.add(s3ds.addRecord(is));
IOUtils.closeQuietly(is);
}
assertFalse(mBean.isFileSynced(testNodePathName));
taskLatch.countDown();
callbackLatch.countDown();
waitFinish();
} finally {
delete(s3ds, recs);
}
}
private void assertSyncedTrue(ConsolidatedDataStoreCacheStats mBean,
AbstractSharedCachingDataStore s3ds, InputStream... streams) throws DataStoreException {
taskLatch.countDown();
callbackLatch.countDown();
List<DataRecord> recs = Lists.newArrayList();
try {
for (InputStream is : streams) {
recs.add(s3ds.addRecord(is));
IOUtils.closeQuietly(is);
}
waitFinish();
assertTrue(mBean.isFileSynced(testNodePathName));
} finally {
delete(s3ds, recs);
}
}
private void waitFinish() {
try {
// wait for upload finish
afterExecuteLatch.await();
// Force execute removal from staging cache
ScheduledFuture<?> scheduledFuture = scheduledExecutor
.schedule(dataStore.getCache().getStagingCache().new RemoveJob(), 0,
TimeUnit.MILLISECONDS);
scheduledFuture.get();
LOG.info("After jobs completed");
} catch (Exception e) {
e.printStackTrace();
}
}
private static InputStream getStream(String str) {
return new ByteArrayInputStream(str.getBytes(Charsets.UTF_8));
}
}