blob: a46384a09585acc10671076d230b30919dad52c4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.jackrabbit.oak.plugins.document.mongo;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.mongodb.ReadPreference;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.jackrabbit.guava.common.collect.AbstractIterator;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import static com.mongodb.ReadPreference.primary;
import static;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
* Implementation of blob store for the MongoDB extending from
* {@link CachingBlobStore}. It saves blobs into a separate collection in
* MongoDB (not using GridFS) and it supports basic garbage collection.
* FIXME: -Do we need to create commands for retry etc.? -Not sure if this is
* going to work for multiple MKs talking to same MongoDB?
public class MongoBlobStore extends CachingBlobStore {
public static final String COLLECTION_BLOBS = "blobs";
private static final Logger LOG = LoggerFactory.getLogger(MongoBlobStore.class);
private static final int DUPLICATE_KEY_ERROR_CODE = 11000;
private static final CodecRegistry CODEC_REGISTRY = fromRegistries(
fromCodecs(new MongoBlobCodec()),
private final ReadPreference defaultReadPreference;
private final MongoCollection<MongoBlob> blobCollection;
private long minLastModified;
private final boolean readOnly;
* Constructs a new {@code MongoBlobStore}
* @param db the database
public MongoBlobStore(MongoDatabase db) {
this(db, DEFAULT_CACHE_SIZE, null);
* Constructs a new {@code MongoBlobStore}
* @param db the database
* @param cacheSize the cache size
public MongoBlobStore(MongoDatabase db, long cacheSize) {
this(db, cacheSize, null);
* Constructs a new {@code MongoBlobStore}
* @param db the database
* @param cacheSize the cache size
* @param builder {@link DocumentNodeStoreBuilder}, supplying further options
public MongoBlobStore(@NotNull MongoDatabase db, long cacheSize, @Nullable DocumentNodeStoreBuilder<?> builder) {
readOnly = builder == null ? false : builder.getReadOnlyMode();
// use a block size of 2 MB - 1 KB, because MongoDB rounds up the
// space allocated for a record to the next power of two
// (there is an overhead per record, let's assume it is 1 KB at most)
setBlockSize(2 * 1024 * 1024 - 1024);
defaultReadPreference = db.getReadPreference();
blobCollection = initBlobCollection(db, readOnly);
protected void storeBlock(byte[] digest, int level, byte[] data) throws IOException {
String id = StringUtils.convertBytesToHex(digest);
cache.put(id, data);
// Create the mongo blob object
BasicDBObject mongoBlob = new BasicDBObject(MongoBlob.KEY_ID, id);
mongoBlob.append(MongoBlob.KEY_DATA, data);
mongoBlob.append(MongoBlob.KEY_LEVEL, level);
// If update only the lastMod needs to be modified
BasicDBObject updateBlob =new BasicDBObject(MongoBlob.KEY_LAST_MOD, System.currentTimeMillis());
BasicDBObject upsert = new BasicDBObject();
upsert.append("$setOnInsert", mongoBlob)
.append("$set", updateBlob);
try {
Bson query = getBlobQuery(id, -1);
UpdateOptions options = new UpdateOptions().upsert(true);
UpdateResult result = getBlobCollection().updateOne(query, upsert, options);
if (result != null && result.getUpsertedId() == null) {
LOG.trace("Block with id [{}] updated", id);
} else {
LOG.trace("Block with id [{}] created", id);
} catch (MongoException e) {
throw new IOException(e.getMessage(), e);
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
String id = StringUtils.convertBytesToHex(blockId.getDigest());
byte[] data = cache.get(id);
if (data == null) {
long start = System.nanoTime();
MongoBlob blobMongo = getBlob(id, 0);
if (blobMongo == null) {
String message = "Did not find block " + id;
throw new IOException(message);
data = blobMongo.getData();
getStatsCollector().downloaded(id, System.nanoTime() - start, TimeUnit.NANOSECONDS, data.length);
cache.put(id, data);
if (blockId.getPos() == 0) {
return data;
int len = (int) (data.length - blockId.getPos());
if (len < 0) {
return new byte[0];
byte[] d2 = new byte[len];
System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
return d2;
public void startMark() throws IOException {
minLastModified = System.currentTimeMillis();
protected boolean isMarkEnabled() {
return minLastModified != 0;
protected void mark(BlockId blockId) throws Exception {
if (minLastModified == 0) {
String id = StringUtils.convertBytesToHex(blockId.getDigest());
Bson query = getBlobQuery(id, minLastModified);
Bson update = new BasicDBObject("$set",
new BasicDBObject(MongoBlob.KEY_LAST_MOD, System.currentTimeMillis()));
getBlobCollection().updateOne(query, update);
public int sweep() throws IOException {
Bson query = getBlobQuery(null, minLastModified);
long num = getBlobCollection().deleteMany(query).getDeletedCount();
minLastModified = 0;
return (int) num;
private MongoCollection<MongoBlob> initBlobCollection(MongoDatabase db, boolean readOnly) {
if (stream(db.listCollectionNames().spliterator(), false)
.noneMatch(COLLECTION_BLOBS::equals)) {
if (readOnly) {
throw new RuntimeException(
"MongoBlobStore instantiated read-only, but collection " + COLLECTION_BLOBS + " not present");
// override the read preference configured with the MongoDB URI
// and use the primary as default. Reading a blob will still
// try a secondary first and then fallback to the primary.
return db.getCollection(COLLECTION_BLOBS, MongoBlob.class)
private MongoCollection<MongoBlob> getBlobCollection() {
return this.blobCollection;
private MongoBlob getBlob(String id, long lastMod) {
Bson query = getBlobQuery(id, lastMod);
Bson fields = new BasicDBObject(MongoBlob.KEY_DATA, 1);
// try with default read preference first, may be from secondary
List<MongoBlob> result = new ArrayList<>(1);
if (result.isEmpty()) {
// not found in the secondary: try the primary
return result.isEmpty() ? null : result.get(0);
private static Bson getBlobQuery(String id, long lastMod) {
List<Bson> clauses = new ArrayList<>(2);
if (id != null) {
clauses.add(Filters.eq(MongoBlob.KEY_ID, id));
if (lastMod > 0) {
clauses.add(, lastMod));
if (clauses.size() == 1) {
return clauses.get(0);
} else {
return Filters.and(clauses);
public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
Bson query = new Document();
if (chunkIds != null) {
query =, chunkIds);
if (maxLastModifiedTime > 0) {
query = Filters.and(
query,, maxLastModifiedTime)
return getBlobCollection().deleteMany(query).getDeletedCount();
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
Bson fields = new BasicDBObject(MongoBlob.KEY_ID, 1);
Bson query = new Document();
if (maxLastModifiedTime != 0 && maxLastModifiedTime != -1) {
query = Filters.lte(MongoBlob.KEY_LAST_MOD, maxLastModifiedTime);
final MongoCursor<MongoBlob> cur = getBlobCollection().find(query)
//TODO The cursor needs to be closed
return new AbstractIterator<String>() {
protected String computeNext() {
if (cur.hasNext()) {
MongoBlob blob =;
if (blob != null) {
return blob.getId();
return endOfData();