blob: 09b3320e0e7472eaafbe1061cd9b46ab21481c71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob.datastore;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterators.filter;
import static com.google.common.collect.Iterators.transform;
import static org.apache.commons.io.IOUtils.closeQuietly;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.jcr.RepositoryException;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.MultiDataStoreAware;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
import org.apache.jackrabbit.oak.api.blob.BlobDownloadOptions;
import org.apache.jackrabbit.oak.api.blob.BlobUpload;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.stats.StatsCollectingStreams;
import org.apache.jackrabbit.oak.spi.blob.stats.BlobStatsCollector;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BlobStore wrapper for DataStore. Wraps Jackrabbit 2 DataStore and expose them as BlobStores
* It also handles inlining binaries if there size is smaller than
* {@link org.apache.jackrabbit.core.data.DataStore#getMinRecordLength()}
*/
public class DataStoreBlobStore
implements DataStore, BlobStore, GarbageCollectableBlobStore, BlobTrackingStore, TypedDataStore,
BlobAccessProvider {
private final Logger log = LoggerFactory.getLogger(getClass());
protected final DataStore delegate;
protected BlobStatsCollector stats = BlobStatsCollector.NOOP;
private BlobTracker tracker;
/**
* If set to true then the blob length information would be encoded as part of blobId
* and thus no extra call would be made to DataStore to determine the length
*
* <b>Implementation Note</b>If enabled the length would be encoded in blobid by appending it at the end.
* This would be done for the methods which are part of BlobStore and GarbageCollectableBlobStore interface
*
* DataIdentifiers which are part of DataStore would not be affected by this as DataStore interface
* is not used in Oak and all access is via BlobStore interface
*/
private final boolean encodeLengthInId;
protected final LoadingCache<String, byte[]> cache;
public static final int DEFAULT_CACHE_SIZE = 16;
/**
* Max size of binary whose content would be cached. We keep it greater than
* Lucene blob size OakDirectory#BLOB_SIZE such that Lucene index blobs are cached
*/
private int maxCachedBinarySize = 1024 * 1024;
private final Weigher<String, byte[]> weigher = new Weigher<String, byte[]>() {
@Override
public int weigh(@NotNull String key, @NotNull byte[] value) {
long weight = (long)StringUtils.estimateMemoryUsage(key) + value.length;
if (weight > Integer.MAX_VALUE) {
log.debug("Calculated weight larger than Integer.MAX_VALUE: {}.", weight);
weight = Integer.MAX_VALUE;
}
return (int) weight;
}
};
private final CacheStats cacheStats;
public static final String MEM_CACHE_NAME = "BlobStore-MemCache";
public DataStoreBlobStore(DataStore delegate) {
this(delegate, true, DEFAULT_CACHE_SIZE);
}
public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId) {
this(delegate, encodeLengthInId, DEFAULT_CACHE_SIZE);
}
public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId, int cacheSizeInMB) {
this.delegate = delegate;
this.encodeLengthInId = encodeLengthInId;
long cacheSize = (long) cacheSizeInMB * FileUtils.ONE_MB;
this.cache = CacheLIRS.<String, byte[]>newBuilder()
.module(MEM_CACHE_NAME)
.recordStats()
.maximumWeight(cacheSize)
.weigher(weigher)
.build();
this.cacheStats = new CacheStats(cache, MEM_CACHE_NAME, weigher, cacheSize);
}
//~----------------------------------< DataStore >
@Override
public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
if (isInMemoryRecord(identifier)) {
return getDataRecord(identifier.toString());
}
return delegate.getRecordIfStored(identifier);
}
@Override
public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
if (isInMemoryRecord(identifier)) {
return getDataRecord(identifier.toString());
}
return delegate.getRecord(identifier);
}
@Override
public DataRecord getRecordFromReference(String reference) throws DataStoreException {
return delegate.getRecordFromReference(reference);
}
@Override
public DataRecord addRecord(InputStream stream) throws DataStoreException {
try {
return writeStream(stream, new BlobOptions());
} catch (IOException e) {
throw new DataStoreException(e);
}
}
@Override
public void updateModifiedDateOnAccess(long before) {
delegate.updateModifiedDateOnAccess(before);
}
@Override
public int deleteAllOlderThan(long min) throws DataStoreException {
return delegate.deleteAllOlderThan(min);
}
@Override
public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
return delegate.getAllIdentifiers();
}
@Override
public void init(String homeDir) throws RepositoryException {
throw new UnsupportedOperationException("DataStore cannot be initialized again");
}
@Override
public int getMinRecordLength() {
return delegate.getMinRecordLength();
}
@Override
public void close() throws DataStoreException {
delegate.close();
cache.invalidateAll();
closeQuietly(tracker);
}
//~-------------------------------------------< BlobStore >
@Override
public String writeBlob(InputStream stream) throws IOException {
return writeBlob(stream, new BlobOptions());
}
@Override
public String writeBlob(InputStream stream, BlobOptions options) throws IOException {
boolean threw = true;
try {
long start = System.nanoTime();
checkNotNull(stream);
DataRecord dr = writeStream(stream, options);
String id = getBlobId(dr);
if (tracker != null && !InMemoryDataRecord.isInstance(id)) {
try {
tracker.add(id);
log.trace("Tracked Id {}", id);
} catch (Exception e) {
log.warn("Could not add track id", e);
}
}
threw = false;
stats.uploaded(System.nanoTime() - start, TimeUnit.NANOSECONDS, dr.getLength());
stats.uploadCompleted(id);
return id;
} catch (DataStoreException e) {
throw new IOException(e);
} finally {
//DataStore does not closes the stream internally
//So close the stream explicitly
Closeables.close(stream, threw);
}
}
@Override
public int readBlob(String encodedBlobId, long pos, byte[] buff, int off, int length) throws IOException {
//This is inefficient as repeated calls for same blobId would involve opening new Stream
//instead clients should directly access the stream from DataRecord by special casing for
//BlobStore which implements DataStore
InputStream stream = getInputStream(encodedBlobId);
boolean threw = true;
try {
ByteStreams.skipFully(stream, pos);
int readCount = stream.read(buff, off, length);
threw = false;
return readCount;
} finally {
Closeables.close(stream, threw);
}
}
@Override
public long getBlobLength(String encodedBlobId) throws IOException {
try {
checkNotNull(encodedBlobId, "BlobId must be specified");
BlobId id = BlobId.of(encodedBlobId);
if (encodeLengthInId && id.hasLengthInfo()) {
return id.length;
}
return getDataRecord(id.blobId).getLength();
} catch (DataStoreException e) {
throw new IOException(e);
}
}
@Override
public String getBlobId(@NotNull String reference) {
checkNotNull(reference);
DataRecord record;
try {
record = delegate.getRecordFromReference(reference);
if (record != null) {
return getBlobId(record);
}
} catch (DataStoreException e) {
log.warn("Unable to access the blobId for [{}]", reference, e);
}
return null;
}
@Override
public String getReference(@NotNull String encodedBlobId) {
checkNotNull(encodedBlobId);
String blobId = extractBlobId(encodedBlobId);
//Reference are not created for in memory record
if (InMemoryDataRecord.isInstance(blobId)) {
return null;
}
DataRecord record;
try {
record = delegate.getRecordIfStored(new DataIdentifier(blobId));
if (record != null) {
return record.getReference();
} else {
log.debug("No blob found for id [{}]", blobId);
}
} catch (DataStoreException e) {
log.warn("Unable to access the blobId for [{}]", blobId, e);
}
return null;
}
@Override
public InputStream getInputStream(final String encodedBlobId) throws IOException {
final BlobId blobId = BlobId.of(encodedBlobId);
if (encodeLengthInId
&& blobId.hasLengthInfo()
&& blobId.length <= maxCachedBinarySize) {
try {
byte[] content = cache.get(blobId.blobId, new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
boolean threw = true;
InputStream stream = getStream(blobId.blobId);
try {
byte[] result = IOUtils.toByteArray(stream);
threw = false;
return result;
} finally {
Closeables.close(stream, threw);
}
}
});
return new ByteArrayInputStream(content);
} catch (ExecutionException e) {
log.warn("Error occurred while loading bytes from steam while fetching for id {}", encodedBlobId, e);
}
}
return getStream(blobId.blobId);
}
//~-------------------------------------------< GarbageCollectableBlobStore >
@Override
public void setBlockSize(int x) {
// nothing to do
}
@Override
public String writeBlob(String tempFileName) throws IOException {
File file = new File(tempFileName);
InputStream in = null;
try {
in = new FileInputStream(file);
return writeBlob(in);
} finally {
closeQuietly(in);
FileUtils.forceDelete(file);
}
}
@Override
public int sweep() throws IOException {
return 0;
}
@Override
public void startMark() throws IOException {
// nothing to do
}
@Override
public void clearInUse() {
delegate.clearInUse();
}
@Override
public void clearCache() {
// nothing to do
}
@Override
public long getBlockSizeMin() {
return 0;
}
@Override
public Iterator<String> getAllChunkIds(final long maxLastModifiedTime) throws Exception {
return transform(filter(getAllRecords(), new Predicate<DataRecord>() {
@Override
public boolean apply(@Nullable DataRecord input) {
if (input != null && (maxLastModifiedTime <= 0
|| input.getLastModified() < maxLastModifiedTime)) {
return true;
}
return false;
}
}), new Function<DataRecord, String>() {
@Override
public String apply(DataRecord input) {
if (encodeLengthInId) {
return BlobId.of(input).encodedValue();
}
return input.getIdentifier().toString();
}
});
}
@Override
public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
return (chunkIds.size() == countDeleteChunks(chunkIds, maxLastModifiedTime));
}
@Override
public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
int count = 0;
if (delegate instanceof MultiDataStoreAware) {
List<String> deleted = Lists.newArrayListWithExpectedSize(512);
for (String chunkId : chunkIds) {
String blobId = extractBlobId(chunkId);
DataIdentifier identifier = new DataIdentifier(blobId);
DataRecord dataRecord = getRecordForId(identifier);
boolean success = (maxLastModifiedTime <= 0)
|| dataRecord.getLastModified() <= maxLastModifiedTime;
log.trace("Deleting blob [{}] with last modified date [{}] : [{}]", blobId,
dataRecord.getLastModified(), success);
if (success) {
((MultiDataStoreAware) delegate).deleteRecord(identifier);
deleted.add(blobId);
count++;
if (count % 512 == 0) {
log.info("Deleted blobs {}", deleted);
deleted.clear();
}
}
}
if (!deleted.isEmpty()) {
log.info("Deleted blobs {}", deleted);
}
}
return count;
}
@Override
public Iterator<String> resolveChunks(String blobId) throws IOException {
if (!InMemoryDataRecord.isInstance(blobId)) {
return Iterators.singletonIterator(blobId);
}
return Collections.emptyIterator();
}
@Override
public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
if (delegate instanceof SharedDataStore) {
((SharedDataStore) delegate).addMetadataRecord(stream, name);
}
}
@Override
public void addMetadataRecord(File f, String name) throws DataStoreException {
if (delegate instanceof SharedDataStore) {
((SharedDataStore) delegate).addMetadataRecord(f, name);
}
}
@Override public DataRecord getMetadataRecord(String name) {
if (delegate instanceof SharedDataStore) {
return ((SharedDataStore) delegate).getMetadataRecord(name);
}
return null;
}
@Override
public boolean metadataRecordExists(String name) {
return delegate instanceof SharedDataStore && ((SharedDataStore) delegate).metadataRecordExists(name);
}
@Override
public List<DataRecord> getAllMetadataRecords(String prefix) {
if (delegate instanceof SharedDataStore) {
return ((SharedDataStore) delegate).getAllMetadataRecords(prefix);
}
return null;
}
@Override
public boolean deleteMetadataRecord(String name) {
return delegate instanceof SharedDataStore && ((SharedDataStore) delegate).deleteMetadataRecord(name);
}
@Override
public void deleteAllMetadataRecords(String prefix) {
if (delegate instanceof SharedDataStore) {
((SharedDataStore) delegate).deleteAllMetadataRecords(prefix);
}
}
@Override
public Iterator<DataRecord> getAllRecords() throws DataStoreException {
if (delegate instanceof SharedDataStore) {
return ((SharedDataStore) delegate).getAllRecords();
} else {
return Iterators.transform(delegate.getAllIdentifiers(),
new Function<DataIdentifier, DataRecord>() {
@Nullable
@Override
public DataRecord apply(@Nullable DataIdentifier input) {
try {
return delegate.getRecord(input);
} catch (DataStoreException e) {
log.warn("Error occurred while fetching DataRecord for identifier {}", input, e);
}
return null;
}
});
}
}
@Override
public DataRecord getRecordForId(DataIdentifier identifier) throws DataStoreException {
if (delegate instanceof SharedDataStore) {
return ((SharedDataStore) delegate).getRecordForId(identifier);
}
return delegate.getRecord(identifier);
}
@Override
public Type getType() {
if (delegate instanceof SharedDataStore) {
return Type.SHARED;
}
return Type.DEFAULT;
}
@Override
public DataRecord addRecord(InputStream input, BlobOptions options) throws DataStoreException {
if (delegate instanceof TypedDataStore) {
return ((TypedDataStore) delegate).addRecord(input, options);
}
return delegate.addRecord(input);
}
//~---------------------------------------------< Object >
@Override
public String toString() {
return String.format("DataStore backed BlobStore [%s]", delegate.getClass().getName());
}
//~---------------------------------------------< Properties >
public DataStore getDataStore() {
return delegate;
}
public CacheStats getCacheStats() {
return cacheStats;
}
public void setMaxCachedBinarySize(int maxCachedBinarySize) {
this.maxCachedBinarySize = maxCachedBinarySize;
}
public void setBlobStatsCollector(BlobStatsCollector stats) {
this.stats = stats;
}
@Override
public void addTracker(BlobTracker tracker) {
this.tracker = tracker;
}
@Override
@Nullable
public BlobTracker getTracker() {
return tracker;
}
//~---------------------------------------------< Internal >
protected InputStream getStream(String blobId) throws IOException {
try {
InputStream in = getDataRecord(blobId).getStream();
if (!(in instanceof BufferedInputStream)){
in = new BufferedInputStream(in);
}
return StatsCollectingStreams.wrap(stats, blobId, in);
} catch (DataStoreException e) {
throw new IOException(e);
}
}
protected DataRecord getDataRecord(String blobId) throws DataStoreException {
DataRecord id;
if (InMemoryDataRecord.isInstance(blobId)) {
id = InMemoryDataRecord.getInstance(blobId);
} else {
id = delegate.getRecord(new DataIdentifier(blobId));
}
checkNotNull(id, "No DataRecord found for blobId [%s]", blobId);
return id;
}
private static boolean isInMemoryRecord(DataIdentifier identifier) {
return InMemoryDataRecord.isInstance(identifier.toString());
}
/**
* Create a BLOB value from in input stream. Small objects will create an in-memory object,
* while large objects are stored in the data store
*
* @param in the input stream
* @param options
* @return the value
*/
private DataRecord writeStream(InputStream in, BlobOptions options) throws IOException, DataStoreException {
int maxMemorySize = Math.max(0, delegate.getMinRecordLength() + 1);
byte[] buffer = new byte[maxMemorySize];
int pos = 0, len = maxMemorySize;
while (pos < maxMemorySize) {
int l = in.read(buffer, pos, len);
if (l < 0) {
break;
}
pos += l;
len -= l;
}
DataRecord record;
if (pos < maxMemorySize) {
// shrink the buffer
byte[] data = new byte[pos];
System.arraycopy(buffer, 0, data, 0, pos);
record = InMemoryDataRecord.getInstance(data);
} else {
// a few bytes are already read, need to re-build the input stream
in = new SequenceInputStream(new ByteArrayInputStream(buffer, 0, pos), in);
record = addRecord(in, options);
}
return record;
}
private String getBlobId(DataRecord dr) {
if (encodeLengthInId) {
return BlobId.of(dr).encodedValue();
}
return dr.getIdentifier().toString();
}
protected String extractBlobId(String encodedBlobId) {
if (encodeLengthInId) {
return BlobId.of(encodedBlobId).blobId;
}
return encodedBlobId;
}
// <--------------- BlobAccessProvider implementation - Direct binary access feature --------------->
@Nullable
@Override
public BlobUpload initiateBlobUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
throws IllegalArgumentException {
if (delegate instanceof DataRecordAccessProvider) {
try {
DataRecordAccessProvider provider = (DataRecordAccessProvider) this.delegate;
DataRecordUpload upload = provider.initiateDataRecordUpload(maxUploadSizeInBytes, maxNumberOfURIs);
if (upload == null) {
return null;
}
return new BlobUpload() {
@Override
@NotNull
public String getUploadToken() {
return upload.getUploadToken();
}
@Override
public long getMinPartSize() {
return upload.getMinPartSize();
}
@Override
public long getMaxPartSize() {
return upload.getMaxPartSize();
}
@Override
@NotNull
public Collection<URI> getUploadURIs() {
return upload.getUploadURIs();
}
};
}
catch (DataRecordUploadException e) {
log.warn("Unable to initiate direct upload", e);
}
}
return null;
}
@Nullable
@Override
public Blob completeBlobUpload(@NotNull String uploadToken) throws IllegalArgumentException {
if (delegate instanceof DataRecordAccessProvider) {
try {
DataRecord record = ((DataRecordAccessProvider) delegate).completeDataRecordUpload(uploadToken);
return new BlobStoreBlob(this, record.getIdentifier().toString());
}
catch (DataStoreException | DataRecordUploadException e) {
log.warn("Unable to complete direct upload for upload token {}", uploadToken, e);
}
}
return null;
}
@Nullable
@Override
public URI getDownloadURI(@NotNull Blob blob, @NotNull BlobDownloadOptions downloadOptions) {
if (delegate instanceof DataRecordAccessProvider) {
String blobId = blob.getContentIdentity();
if (blobId != null) {
return ((DataRecordAccessProvider) delegate).getDownloadURI(
new DataIdentifier(extractBlobId(blobId)),
DataRecordDownloadOptions.fromBlobDownloadOptions(downloadOptions)
);
}
}
return null;
}
public static class BlobId {
static final String SEP = "#";
public String getBlobId() {
return blobId;
}
public long getLength() {
return length;
}
final String blobId;
final long length;
BlobId(String blobId, long length) {
this.blobId = blobId;
this.length = length;
}
BlobId(DataRecord dr) {
this.blobId = dr.getIdentifier().toString();
long len;
try {
len = dr.getLength();
} catch (DataStoreException e) {
//Cannot determine length
len = -1;
}
this.length = len;
}
BlobId(String encodedBlobId) {
int indexOfSep = encodedBlobId.lastIndexOf(SEP);
if (indexOfSep != -1) {
this.blobId = encodedBlobId.substring(0, indexOfSep);
this.length = Long.valueOf(encodedBlobId.substring(indexOfSep+SEP.length()));
} else {
this.blobId = encodedBlobId;
this.length = -1;
}
}
String encodedValue() {
if (hasLengthInfo()) {
return blobId + SEP + String.valueOf(length);
} else {
return blobId;
}
}
boolean hasLengthInfo() {
return length != -1;
}
static boolean isEncoded(String encodedBlobId) {
return encodedBlobId.contains(SEP);
}
public static BlobId of(String encodedValue) {
return new BlobId(encodedValue);
}
static BlobId of(DataRecord dr) {
return new BlobId(dr);
}
}
}