| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.jcr.binary; |
| |
| import static org.apache.jackrabbit.oak.jcr.binary.util.BinaryAccessTestUtils.getBinary; |
| import static org.apache.jackrabbit.oak.jcr.binary.util.BinaryAccessTestUtils.httpGet; |
| import static org.apache.jackrabbit.oak.jcr.binary.util.BinaryAccessTestUtils.isSuccessfulHttpPut; |
| import static org.apache.jackrabbit.oak.jcr.binary.util.BinaryAccessTestUtils.putBinary; |
| import static org.apache.jackrabbit.oak.jcr.binary.util.BinaryAccessTestUtils.storeBinaryAndRetrieve; |
| import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import javax.jcr.Binary; |
| import javax.jcr.Node; |
| import javax.jcr.NodeIterator; |
| import javax.jcr.RepositoryException; |
| import javax.jcr.Session; |
| |
| import org.apache.jackrabbit.api.JackrabbitValueFactory; |
| import org.apache.jackrabbit.api.binary.BinaryDownload; |
| import org.apache.jackrabbit.api.binary.BinaryDownloadOptions; |
| import org.apache.jackrabbit.api.binary.BinaryUpload; |
| import org.apache.jackrabbit.core.data.DataStoreException; |
| import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; |
| import org.apache.jackrabbit.oak.jcr.binary.fixtures.datastore.AzureDataStoreFixture; |
| import org.apache.jackrabbit.oak.jcr.binary.fixtures.datastore.S3DataStoreFixture; |
| import org.apache.jackrabbit.oak.jcr.binary.fixtures.nodestore.SegmentMemoryNodeStoreFixture; |
| import org.apache.jackrabbit.oak.jcr.binary.util.Content; |
| import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; |
| import org.apache.jackrabbit.oak.segment.SegmentBlobReferenceRetriever; |
| import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; |
| import org.apache.jackrabbit.oak.segment.file.FileStore; |
| import org.apache.jackrabbit.oak.spi.blob.BlobStore; |
| import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; |
| import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runners.Parameterized; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| public class BinaryAccessDSGCIT extends AbstractBinaryAccessIT { |
| private static final String TEST_ROOT = "testroot"; |
| private static final long BINARY_SIZE = 1024*1024; |
| |
| private static final String TRADITIONAL_UPLOAD_1 = "tu1"; |
| private static final String TRADITIONAL_UPLOAD_2 = "tu2"; |
| private static final String DIRECT_UPLOAD_1 = "du1"; |
| private static final String DIRECT_UPLOAD_2 = "du2"; |
| |
| @Parameterized.Parameters(name = "{0}") |
| public static Iterable<?> dataStoreFixtures() { |
| Collection<NodeStoreFixture> fixtures = new ArrayList<>(); |
| fixtures.add(new SegmentMemoryNodeStoreFixture(new S3DataStoreFixture())); |
| fixtures.add(new SegmentMemoryNodeStoreFixture(new AzureDataStoreFixture())); |
| return fixtures; |
| } |
| |
| public BinaryAccessDSGCIT(NodeStoreFixture fixture) { |
| // reuse NodeStore (and DataStore) across all tests in this class |
| super(fixture, true); |
| } |
| |
| private Session session; |
| private JackrabbitValueFactory directUploader; |
| |
| private MarkSweepGarbageCollector garbageCollector = null; |
| private ThreadPoolExecutor executor = null; |
| |
| @Before |
| public void setup() throws RepositoryException { |
| session = getAdminSession(); |
| directUploader = (JackrabbitValueFactory) session.getValueFactory(); |
| |
| if (session.getNode("/").hasNode(TEST_ROOT)) { |
| session.getNode("/" + TEST_ROOT).remove(); |
| session.save(); |
| } |
| session.getNode("/").addNode(TEST_ROOT); |
| |
| getConfigurableHttpDataRecordProvider().setDirectUploadURIExpirySeconds(60*5); |
| getConfigurableHttpDataRecordProvider().setDirectDownloadURIExpirySeconds(60*5); |
| } |
| |
| // For debugging. |
| private void printTree(Node root) throws RepositoryException { |
| printTree(root, 0); |
| } |
| |
| // For debugging. |
| private void printTree(Node root, int level) throws RepositoryException { |
| for (int i=0; i<level; i++) { |
| System.out.print(" "); |
| } |
| System.out.println(root.getName()); |
| |
| NodeIterator iter = root.getNodes(); |
| while (iter.hasNext()) { |
| printTree(iter.nextNode(), level+1); |
| } |
| } |
| |
| private String toAbsolutePath(String leaf) { |
| return "/" + TEST_ROOT + "/" + leaf; |
| } |
| |
| private Binary createDirectBinary(String path, Content content) throws RepositoryException, IOException { |
| BinaryUpload upload = directUploader.initiateBinaryUpload(content.size(), 1); // multi-part not needed for this |
| assertNotNull(upload); |
| |
| int code = content.httpPUT(upload.getUploadURIs().iterator().next()); |
| assertTrue(isSuccessfulHttpPut(code, getConfigurableHttpDataRecordProvider())); |
| |
| Binary binary = directUploader.completeBinaryUpload(upload.getUploadToken()); |
| putBinary(session, path, binary); |
| |
| return getBinary(session, path); |
| } |
| |
| private void verifyBinariesExistViaSession(Session session, |
| Map<String, Binary> binaries, |
| Map<String, Content> binaryContent) |
| throws RepositoryException, IOException { |
| for (Map.Entry<String, Binary> entry : binaries.entrySet()) { |
| Binary b = getBinary(session, toAbsolutePath(entry.getKey())); |
| assertEquals(b, entry.getValue()); |
| binaryContent.get(entry.getKey()).assertEqualsWith(b.getStream()); |
| } |
| } |
| |
| private void verifyBinariesExistDirectly(Map<String, Binary> binaries, Map<String, Content> binaryContent) |
| throws RepositoryException, IOException { |
| for (Map.Entry<String, Binary> entry : binaries.entrySet()) { |
| assertTrue(entry.getValue() instanceof BinaryDownload); |
| URI uri = ((BinaryDownload) entry.getValue()).getURI(BinaryDownloadOptions.DEFAULT); |
| binaryContent.get(entry.getKey()).assertEqualsWith(httpGet(uri)); |
| } |
| } |
| |
| private void verifyBinariesDoNotExistDirectly(Map<String, Binary> deletedBinaries) throws RepositoryException { |
| for (Map.Entry<String, Binary> entry : deletedBinaries.entrySet()) { |
| assertTrue(entry.getValue() instanceof BinaryDownload); |
| URI uri = ((BinaryDownload) entry.getValue()).getURI(BinaryDownloadOptions.DEFAULT); |
| assertNull(uri); |
| } |
| } |
| |
| private void compactFileStore() { |
| FileStore fileStore = getNodeStoreComponent(FileStore.class); |
| for (int i=0; i<SegmentGCOptions.defaultGCOptions().getRetainedGenerations(); i++) { |
| fileStore.compactFull(); |
| } |
| } |
| |
| private MarkSweepGarbageCollector getGarbageCollector() |
| throws DataStoreException, IOException { |
| DataStoreBlobStore blobStore = (DataStoreBlobStore) getNodeStoreComponent(BlobStore.class); |
| FileStore fileStore = getNodeStoreComponent(FileStore.class); |
| File fileStoreRoot = getNodeStoreComponent(FileStore.class.getName() + ":root"); |
| |
| if (null == garbageCollector) { |
| String repoId = ClusterRepositoryInfo.getOrCreateId(getNodeStore()); |
| blobStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), |
| REPOSITORY.getNameFromId(repoId)); |
| if (null == executor) { |
| executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); |
| } |
| garbageCollector = new MarkSweepGarbageCollector( |
| new SegmentBlobReferenceRetriever(fileStore), |
| blobStore, |
| executor, |
| fileStoreRoot.getAbsolutePath(), |
| 2048, |
| 0, |
| repoId |
| ); |
| } |
| return garbageCollector; |
| } |
| |
| private int getBlobCount() throws Exception { |
| GarbageCollectableBlobStore ds = (GarbageCollectableBlobStore) getNodeStoreComponent(BlobStore.class); |
| Set<String> chunks = Sets.newHashSet(); |
| Iterator<String> chunkIds = ds.getAllChunkIds(0); |
| while (chunkIds.hasNext()) { |
| chunks.add(chunkIds.next()); |
| } |
| return chunks.size(); |
| } |
| |
| @Test |
| public void testGC() throws Exception { |
| Map<String, Content> binaryContent = Maps.newHashMap(); |
| Map<String, Binary> binaries = Maps.newHashMap(); |
| |
| for (String key : Lists.newArrayList(TRADITIONAL_UPLOAD_1, TRADITIONAL_UPLOAD_2)) { |
| Content content = Content.createRandom(BINARY_SIZE); |
| binaryContent.put(key, content); |
| binaries.put(key, storeBinaryAndRetrieve(session, toAbsolutePath(key), content)); |
| } |
| for (String key : Lists.newArrayList(DIRECT_UPLOAD_1, DIRECT_UPLOAD_2)) { |
| Content content = Content.createRandom(BINARY_SIZE); |
| binaryContent.put(key, content); |
| binaries.put(key, createDirectBinary(toAbsolutePath(key), content)); |
| } |
| session.save(); |
| |
| // Test that all four binaries can be accessed |
| assertEquals(4, getBlobCount()); |
| // - Download all four via repo |
| verifyBinariesExistViaSession(session, binaries, binaryContent); |
| // - Download directly |
| verifyBinariesExistDirectly(binaries, binaryContent); |
| |
| // Delete one of the binaries uploaded via repo and one uploaded directly |
| Node testRoot = session.getNode("/" + TEST_ROOT); |
| List<String> deletedBinaryPaths = Lists.newArrayList(TRADITIONAL_UPLOAD_2, DIRECT_UPLOAD_2); |
| for (String path : deletedBinaryPaths) { |
| Node toRemove = testRoot.getNode(path); |
| toRemove.remove(); |
| } |
| session.save(); |
| |
| |
| // Verify that they are deleted from repo |
| for (String path : deletedBinaryPaths) { |
| assertFalse(session.nodeExists(toAbsolutePath(path))); |
| } |
| |
| // Verify that all four binaries are still in data store |
| assertEquals(4, getBlobCount()); |
| |
| |
| // Run DSGC |
| compactFileStore(); |
| MarkSweepGarbageCollector garbageCollector = getGarbageCollector(); |
| garbageCollector.collectGarbage(false); |
| |
| // Verify that only two binaries remain in data store |
| assertEquals(2, getBlobCount()); |
| |
| // Verify that the two binaries remaining can still be accessed |
| Map<String, Binary> deletedBinaries = Maps.newHashMap(); |
| for (String deletedPath : Lists.newArrayList(TRADITIONAL_UPLOAD_2, DIRECT_UPLOAD_2)) { |
| deletedBinaries.put(deletedPath, binaries.get(deletedPath)); |
| binaries.remove(deletedPath); |
| binaryContent.remove(deletedPath); |
| } |
| |
| verifyBinariesExistViaSession(session, binaries, binaryContent); |
| verifyBinariesExistDirectly(binaries, binaryContent); |
| |
| verifyBinariesDoNotExistDirectly(deletedBinaries); |
| } |
| } |