| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document.mongo; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.UncheckedExecutionException; |
| import com.mongodb.Block; |
| import com.mongodb.DBObject; |
| import com.mongodb.MongoBulkWriteException; |
| import com.mongodb.MongoClient; |
| import com.mongodb.MongoClientURI; |
| import com.mongodb.ReadPreference; |
| |
| import org.apache.jackrabbit.oak.cache.CacheStats; |
| import org.apache.jackrabbit.oak.cache.CacheValue; |
| import org.apache.jackrabbit.oak.plugins.document.Collection; |
| import org.apache.jackrabbit.oak.plugins.document.Document; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStore; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStoreStatsCollector; |
| import org.apache.jackrabbit.oak.plugins.document.JournalEntry; |
| import org.apache.jackrabbit.oak.plugins.document.NodeDocument; |
| import org.apache.jackrabbit.oak.plugins.document.Revision; |
| import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; |
| import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker; |
| import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; |
| import org.apache.jackrabbit.oak.plugins.document.cache.ModificationStamp; |
| import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; |
| import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; |
| import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks; |
| import org.apache.jackrabbit.oak.plugins.document.util.Utils; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.apache.jackrabbit.oak.commons.PerfLogger; |
| import org.bson.conversions.Bson; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.Maps; |
| import com.mongodb.BasicDBObject; |
| import com.mongodb.MongoException; |
| import com.mongodb.WriteConcern; |
| import com.mongodb.bulk.BulkWriteError; |
| import com.mongodb.bulk.BulkWriteResult; |
| import com.mongodb.bulk.BulkWriteUpsert; |
| import com.mongodb.client.ClientSession; |
| import com.mongodb.client.FindIterable; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.client.MongoCursor; |
| import com.mongodb.client.MongoDatabase; |
| import com.mongodb.client.model.BulkWriteOptions; |
| import com.mongodb.client.model.Filters; |
| import com.mongodb.client.model.FindOneAndUpdateOptions; |
| import com.mongodb.client.model.ReturnDocument; |
| import com.mongodb.client.model.UpdateOneModel; |
| import com.mongodb.client.model.UpdateOptions; |
| import com.mongodb.client.model.WriteModel; |
| import com.mongodb.client.result.DeleteResult; |
| import com.mongodb.client.result.UpdateResult; |
| |
| import static com.google.common.base.Predicates.in; |
| import static com.google.common.base.Predicates.not; |
| import static com.google.common.collect.Iterables.filter; |
| import static com.google.common.collect.Maps.filterKeys; |
| import static com.google.common.collect.Sets.difference; |
| import static org.apache.jackrabbit.oak.plugins.document.DocumentStoreException.asDocumentStoreException; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.DELETED_ONCE; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SD_MAX_REV_TIME_IN_SECS; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SD_TYPE; |
| import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition.newEqualsCondition; |
| import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoUtils.createIndex; |
| import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoUtils.createPartialIndex; |
| import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoUtils.getDocumentStoreExceptionTypeFor; |
| import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoUtils.hasIndex; |
| |
| /** |
| * A document store that uses MongoDB as the backend. |
| */ |
| public class MongoDocumentStore implements DocumentStore { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class); |
| private static final PerfLogger PERFLOG = new PerfLogger( |
| LoggerFactory.getLogger(MongoDocumentStore.class.getName() |
| + ".perf")); |
| |
| private static final Bson BY_ID_ASC = new BasicDBObject(Document.ID, 1); |
| |
| enum DocumentReadPreference { |
| PRIMARY, |
| PREFER_PRIMARY, |
| PREFER_SECONDARY, |
| PREFER_SECONDARY_IF_OLD_ENOUGH |
| } |
| |
| public static final int IN_CLAUSE_BATCH_SIZE = 500; |
| |
| /** |
| * A conflicting ID assignment on insert. Used by |
| * {@link #sendBulkUpdate(Collection, java.util.Collection, Map)} for |
| * {@code UpdateOp} with {@code isNew == false}. An upsert with this |
| * operation will always fail when there is no existing document. |
| */ |
| private static final Map CONFLICT_ON_INSERT = new BasicDBObject( |
| "$setOnInsert", |
| new BasicDBObject(Document.ID, "a").append(Document.ID, "b") |
| ).toMap(); |
| |
| private MongoCollection<BasicDBObject> nodes; |
| private final MongoCollection<BasicDBObject> clusterNodes; |
| private final MongoCollection<BasicDBObject> settings; |
| private final MongoCollection<BasicDBObject> journal; |
| |
| private final MongoClient client; |
| private final MongoStatus status; |
| private final MongoSessionFactory sessionFactory; |
| private final MongoDatabase db; |
| |
| private final NodeDocumentCache nodesCache; |
| |
| private final NodeDocumentLocks nodeLocks; |
| |
| private Clock clock = Clock.SIMPLE; |
| |
| private final long maxReplicationLagMillis; |
| |
| /** |
| * Duration in seconds under which queries would use index on _modified field |
| * If set to -1 then modifiedTime index would not be used. |
| * <p> |
| * Default is 60 seconds. |
| */ |
| private final long maxDeltaForModTimeIdxSecs = |
| Long.getLong("oak.mongo.maxDeltaForModTimeIdxSecs", 60); |
| |
| /** |
| * Disables the index hint sent to MongoDB. |
| * This overrides {@link #maxDeltaForModTimeIdxSecs}. |
| */ |
| private final boolean disableIndexHint = |
| Boolean.getBoolean("oak.mongo.disableIndexHint"); |
| |
| /** |
| * Duration in milliseconds after which a mongo query will be terminated. |
| * <p> |
| * If this value is -1 no timeout is being set at all, if it is 1 or greater |
| * this translated to MongoDB's maxTimeNS being set accordingly. |
| * <p> |
| * Default is 60'000 (one minute). |
| * See: http://mongodb.github.io/node-mongodb-native/driver-articles/anintroductionto1_4_and_2_6.html#maxtimems |
| */ |
| private final long maxQueryTimeMS = |
| Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1)); |
| |
| /** |
| * The number of documents to put into one bulk update. |
| * <p> |
| * Default is 30. |
| */ |
| private int bulkSize = |
| Integer.getInteger("oak.mongo.bulkSize", 30); |
| |
| /** |
| * How many times should be the bulk update request retries in case of |
| * a conflict. |
| * <p> |
| * Default is 0 (no retries). |
| */ |
| private int bulkRetries = |
| Integer.getInteger("oak.mongo.bulkRetries", 0); |
| |
| /** |
| * How many times a query to MongoDB should be retried when it fails with a |
| * MongoException. |
| */ |
| private final int queryRetries = |
| Integer.getInteger("oak.mongo.queryRetries", 2); |
| |
| /** |
| * Acceptable replication lag of secondaries in milliseconds. Reads are |
| * directed to the primary if the estimated replication lag is higher than |
| * this value. |
| */ |
| private final int acceptableLagMillis = |
| Integer.getInteger("oak.mongo.acceptableLagMillis", 5000); |
| |
| /** |
| * Feature flag for use of MongoDB client sessions. |
| */ |
| private final boolean useClientSession; |
| |
| private String lastReadWriteMode; |
| |
| private final Map<String, String> metadata; |
| |
| private DocumentStoreStatsCollector stats; |
| |
| private boolean hasModifiedIdCompoundIndex = true; |
| |
| private static final Key KEY_MODIFIED = new Key(MODIFIED_IN_SECS, null); |
| |
| private final boolean readOnly; |
| |
| public MongoDocumentStore(MongoClient client, MongoDatabase db, |
| MongoDocumentNodeStoreBuilderBase<?> builder) { |
| this.readOnly = builder.getReadOnlyMode(); |
| MongoStatus mongoStatus = builder.getMongoStatus(); |
| if (mongoStatus == null) { |
| mongoStatus = new MongoStatus(client, db.getName()); |
| } |
| mongoStatus.checkVersion(); |
| metadata = ImmutableMap.<String,String>builder() |
| .put("type", "mongo") |
| .put("version", mongoStatus.getVersion()) |
| .build(); |
| |
| this.client = client; |
| this.status = mongoStatus; |
| this.sessionFactory = new MongoSessionFactory(client); |
| this.db = db; |
| stats = builder.getDocumentStoreStatsCollector(); |
| nodes = db.getCollection(Collection.NODES.toString(), BasicDBObject.class); |
| clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString(), BasicDBObject.class); |
| settings = db.getCollection(Collection.SETTINGS.toString(), BasicDBObject.class); |
| journal = db.getCollection(Collection.JOURNAL.toString(), BasicDBObject.class); |
| |
| maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); |
| |
| useClientSession = !builder.isClientSessionDisabled() |
| && Boolean.parseBoolean(System.getProperty("oak.mongo.clientSession", "true")); |
| |
| if (!readOnly) { |
| ensureIndexes(mongoStatus); |
| } |
| |
| this.nodeLocks = new StripedNodeDocumentLocks(); |
| this.nodesCache = builder.buildNodeDocumentCache(this, nodeLocks); |
| |
| LOG.info("Connected to MongoDB {} with maxReplicationLagMillis {}, " + |
| "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, " + |
| "clientSessionSupported {}, clientSessionInUse {}, {}, " + |
| "serverStatus {}", |
| mongoStatus.getVersion(), maxReplicationLagMillis, |
| maxDeltaForModTimeIdxSecs, disableIndexHint, |
| status.isClientSessionSupported(), useClientSession, |
| db.getWriteConcern(), mongoStatus.getServerDetails()); |
| } |
| |
| private void ensureIndexes(@NotNull MongoStatus mongoStatus) { |
| // reading documents in the nodes collection and checking |
| // existing indexes is performed against the MongoDB primary |
| // this ensures the information is up-to-date and accurate |
| boolean emptyNodesCollection = execute(session -> MongoUtils.isCollectionEmpty(nodes, session)); |
| |
| // compound index on _modified and _id |
| if (emptyNodesCollection) { |
| // this is an empty store, create a compound index |
| // on _modified and _id (OAK-3071) |
| createIndex(nodes, new String[]{NodeDocument.MODIFIED_IN_SECS, Document.ID}, |
| new boolean[]{true, true}, false, false); |
| } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()), |
| NodeDocument.MODIFIED_IN_SECS, Document.ID)) { |
| hasModifiedIdCompoundIndex = false; |
| LOG.warn("Detected an upgrade from Oak version <= 1.2. For optimal " + |
| "performance it is recommended to create a compound index " + |
| "for the 'nodes' collection on {_modified:1, _id:1}."); |
| } |
| |
| // index on the _bin flag to faster access nodes with binaries for GC |
| createIndex(nodes, NodeDocument.HAS_BINARY_FLAG, true, false, true); |
| |
| // index on _deleted for fast lookup of potentially garbage |
| // depending on the MongoDB version, create a partial index |
| if (emptyNodesCollection) { |
| if (mongoStatus.isVersion(3, 2)) { |
| createPartialIndex(nodes, new String[]{DELETED_ONCE, MODIFIED_IN_SECS}, |
| new boolean[]{true, true}, "{" + DELETED_ONCE + ":true}"); |
| } else { |
| createIndex(nodes, NodeDocument.DELETED_ONCE, true, false, true); |
| } |
| } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()), |
| DELETED_ONCE, MODIFIED_IN_SECS)) { |
| LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " + |
| "Revision GC performance it is recommended to create a " + |
| "partial index for the 'nodes' collection on " + |
| "{_deletedOnce:1, _modified:1} with a partialFilterExpression " + |
| "{_deletedOnce:true}. Partial indexes require MongoDB 3.2 " + |
| "or higher."); |
| } |
| |
| // compound index on _sdType and _sdMaxRevTime |
| if (emptyNodesCollection) { |
| // this is an empty store, create compound index |
| // on _sdType and _sdMaxRevTime (OAK-6129) |
| createIndex(nodes, new String[]{SD_TYPE, SD_MAX_REV_TIME_IN_SECS}, |
| new boolean[]{true, true}, false, true); |
| } else if (!hasIndex(nodes.withReadPreference(ReadPreference.primary()), |
| SD_TYPE, SD_MAX_REV_TIME_IN_SECS)) { |
| LOG.warn("Detected an upgrade from Oak version <= 1.6. For optimal " + |
| "Revision GC performance it is recommended to create a " + |
| "sparse compound index for the 'nodes' collection on " + |
| "{_sdType:1, _sdMaxRevTime:1}."); |
| } |
| |
| // index on _modified for journal entries |
| createIndex(journal, JournalEntry.MODIFIED, true, false, false); |
| } |
| |
| public boolean isReadOnly() { |
| return readOnly; |
| } |
| |
| @Override |
| public void finalize() throws Throwable { |
| super.finalize(); |
| // TODO should not be needed, but it seems |
| // oak-jcr doesn't call dispose() |
| dispose(); |
| } |
| |
| @Override |
| public CacheInvalidationStats invalidateCache() { |
| InvalidationResult result = new InvalidationResult(); |
| for (CacheValue key : nodesCache.keys()) { |
| result.invalidationCount++; |
| invalidateCache(Collection.NODES, key.toString()); |
| } |
| return result; |
| } |
| |
| @Override |
| public CacheInvalidationStats invalidateCache(Iterable<String> keys) { |
| LOG.debug("invalidateCache: start"); |
| final InvalidationResult result = new InvalidationResult(); |
| int size = 0; |
| |
| final Iterator<String> it = keys.iterator(); |
| while(it.hasNext()) { |
| // read chunks of documents only |
| final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE); |
| while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) { |
| final String id = it.next(); |
| if (nodesCache.getIfPresent(id) != null) { |
| // only add those that we actually do have cached |
| ids.add(id); |
| } |
| } |
| size += ids.size(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("invalidateCache: batch size: {} of total so far {}", |
| ids.size(), size); |
| } |
| |
| Map<String, ModificationStamp> modStamps = getModStamps(ids); |
| result.queryCount++; |
| |
| int invalidated = nodesCache.invalidateOutdated(modStamps); |
| for (String id : filter(ids, not(in(modStamps.keySet())))) { |
| nodesCache.invalidate(id); |
| invalidated++; |
| } |
| result.cacheEntriesProcessedCount += ids.size(); |
| result.invalidationCount += invalidated; |
| result.upToDateCount += ids.size() - invalidated; |
| } |
| |
| result.cacheSize = size; |
| LOG.trace("invalidateCache: end. total: {}", size); |
| return result; |
| } |
| |
| @Override |
| public <T extends Document> void invalidateCache(Collection<T> collection, String key) { |
| if (collection == Collection.NODES) { |
| nodesCache.invalidate(key); |
| } |
| } |
| |
| @Override |
| public <T extends Document> T find(Collection<T> collection, String key) { |
| final long start = PERFLOG.start(); |
| final T result = find(collection, key, true, -1); |
| PERFLOG.end(start, 1, "find: preferCached=true, key={}", key); |
| return result; |
| } |
| |
| @Override |
| public <T extends Document> T find(final Collection<T> collection, |
| final String key, |
| int maxCacheAge) { |
| final long start = PERFLOG.start(); |
| final T result = find(collection, key, false, maxCacheAge); |
| PERFLOG.end(start, 1, "find: preferCached=false, key={}", key); |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T extends Document> T find(final Collection<T> collection, |
| final String key, |
| boolean preferCached, |
| final int maxCacheAge) { |
| if (collection != Collection.NODES) { |
| DocumentReadPreference readPref = DocumentReadPreference.PRIMARY; |
| if (withClientSession()) { |
| readPref = getDefaultReadPreference(collection); |
| } |
| return findUncachedWithRetry(collection, key, readPref); |
| } |
| NodeDocument doc; |
| if (maxCacheAge > 0 || preferCached) { |
| // first try without lock |
| doc = nodesCache.getIfPresent(key); |
| if (doc != null) { |
| if (preferCached || |
| getTime() - doc.getCreated() < maxCacheAge) { |
| stats.doneFindCached(collection, key); |
| if (doc == NodeDocument.NULL) { |
| return null; |
| } |
| return (T) doc; |
| } |
| } |
| } |
| Throwable t; |
| try { |
| Lock lock = nodeLocks.acquire(key); |
| try { |
| if (maxCacheAge > 0 || preferCached) { |
| // try again some other thread may have populated |
| // the cache by now |
| doc = nodesCache.getIfPresent(key); |
| if (doc != null) { |
| if (preferCached || |
| getTime() - doc.getCreated() < maxCacheAge) { |
| stats.doneFindCached(collection, key); |
| if (doc == NodeDocument.NULL) { |
| return null; |
| } |
| return (T) doc; |
| } |
| } |
| } |
| final NodeDocument d = (NodeDocument) findUncachedWithRetry( |
| collection, key, |
| getReadPreference(maxCacheAge)); |
| invalidateCache(collection, key); |
| doc = nodesCache.get(key, new Callable<NodeDocument>() { |
| @Override |
| public NodeDocument call() throws Exception { |
| return d == null ? NodeDocument.NULL : d; |
| } |
| }); |
| } finally { |
| lock.unlock(); |
| } |
| if (doc == NodeDocument.NULL) { |
| return null; |
| } else { |
| return (T) doc; |
| } |
| } catch (UncheckedExecutionException e) { |
| t = e.getCause(); |
| } catch (ExecutionException e) { |
| t = e.getCause(); |
| } catch (RuntimeException e) { |
| t = e; |
| } |
| throw handleException(t, collection, key); |
| } |
| |
| /** |
| * Finds a document and performs a number of retries if the read fails with |
| * an exception. |
| * |
| * @param collection the collection to read from. |
| * @param key the key of the document to find. |
| * @param docReadPref the read preference. |
| * @param <T> the document type of the given collection. |
| * @return the document or {@code null} if the document doesn't exist. |
| */ |
| @Nullable |
| private <T extends Document> T findUncachedWithRetry( |
| Collection<T> collection, String key, |
| DocumentReadPreference docReadPref) { |
| if (key.equals("0:/")) { |
| LOG.trace("root node"); |
| } |
| int numAttempts = queryRetries + 1; |
| MongoException ex = null; |
| for (int i = 0; i < numAttempts; i++) { |
| if (i > 0) { |
| LOG.warn("Retrying read of " + key); |
| } |
| try { |
| return findUncached(collection, key, docReadPref); |
| } catch (MongoException e) { |
| ex = e; |
| } |
| } |
| if (ex != null) { |
| throw handleException(ex, collection, key); |
| } else { |
| // impossible to get here |
| throw new IllegalStateException(); |
| } |
| } |
| |
| @Nullable |
| protected <T extends Document> T findUncached(Collection<T> collection, String key, DocumentReadPreference docReadPref) { |
| log("findUncached", key, docReadPref); |
| final Stopwatch watch = startWatch(); |
| boolean isSlaveOk = false; |
| boolean docFound = true; |
| try { |
| ReadPreference readPreference = getMongoReadPreference(collection, null, docReadPref); |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference); |
| |
| if(readPreference.isSlaveOk()){ |
| LOG.trace("Routing call to secondary for fetching [{}]", key); |
| isSlaveOk = true; |
| } |
| |
| List<BasicDBObject> result = new ArrayList<>(1); |
| execute(session -> { |
| if (session != null) { |
| dbCollection.find(session, getByKeyQuery(key)).into(result); |
| } else { |
| dbCollection.find(getByKeyQuery(key)).into(result); |
| } |
| return null; |
| }); |
| |
| if(result.isEmpty()) { |
| docFound = false; |
| return null; |
| } |
| T doc = convertFromDBObject(collection, result.get(0)); |
| if (doc != null) { |
| doc.seal(); |
| } |
| return doc; |
| } finally { |
| stats.doneFindUncached(watch.elapsed(TimeUnit.NANOSECONDS), collection, key, docFound, isSlaveOk); |
| } |
| } |
| |
| @NotNull |
| @Override |
| public <T extends Document> List<T> query(Collection<T> collection, |
| String fromKey, |
| String toKey, |
| int limit) { |
| return query(collection, fromKey, toKey, null, 0, limit); |
| } |
| |
| @NotNull |
| @Override |
| public <T extends Document> List<T> query(Collection<T> collection, |
| String fromKey, |
| String toKey, |
| String indexedProperty, |
| long startValue, |
| int limit) { |
| return queryWithRetry(collection, fromKey, toKey, indexedProperty, |
| startValue, limit, maxQueryTimeMS); |
| } |
| |
| /** |
| * Queries for documents and performs a number of retries if the read fails |
| * with an exception. |
| */ |
| @NotNull |
| private <T extends Document> List<T> queryWithRetry(Collection<T> collection, |
| String fromKey, |
| String toKey, |
| String indexedProperty, |
| long startValue, |
| int limit, |
| long maxQueryTime) { |
| int numAttempts = queryRetries + 1; |
| MongoException ex = null; |
| for (int i = 0; i < numAttempts; i++) { |
| if (i > 0) { |
| LOG.warn("Retrying query, fromKey={}, toKey={}", fromKey, toKey); |
| } |
| try { |
| return queryInternal(collection, fromKey, toKey, |
| indexedProperty, startValue, limit, maxQueryTime); |
| } catch (MongoException e) { |
| ex = e; |
| } |
| } |
| if (ex != null) { |
| throw handleException(ex, collection, Lists.newArrayList(fromKey, toKey)); |
| } else { |
| // impossible to get here |
| throw new IllegalStateException(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @NotNull |
| protected <T extends Document> List<T> queryInternal(Collection<T> collection, |
| String fromKey, |
| String toKey, |
| String indexedProperty, |
| long startValue, |
| int limit, |
| long maxQueryTime) { |
| log("query", fromKey, toKey, indexedProperty, startValue, limit); |
| |
| List<Bson> clauses = new ArrayList<>(); |
| clauses.add(Filters.gt(Document.ID, fromKey)); |
| clauses.add(Filters.lt(Document.ID, toKey)); |
| |
| Bson hint; |
| if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty) |
| && canUseModifiedTimeIdx(startValue)) { |
| hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1); |
| } else { |
| hint = new BasicDBObject(NodeDocument.ID, 1); |
| } |
| |
| if (indexedProperty != null) { |
| if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) { |
| if (startValue != 1) { |
| throw new DocumentStoreException( |
| "unsupported value for property " + |
| NodeDocument.DELETED_ONCE); |
| } |
| clauses.add(Filters.eq(indexedProperty, true)); |
| } else { |
| clauses.add(Filters.gte(indexedProperty, startValue)); |
| } |
| } |
| Bson query = Filters.and(clauses); |
| String parentId = Utils.getParentIdFromLowerLimit(fromKey); |
| long lockTime = -1; |
| final Stopwatch watch = startWatch(); |
| |
| boolean isSlaveOk = false; |
| int resultSize = 0; |
| CacheChangesTracker cacheChangesTracker = null; |
| if (parentId != null && collection == Collection.NODES) { |
| cacheChangesTracker = nodesCache.registerTracker(fromKey, toKey); |
| } |
| try { |
| ReadPreference readPreference = |
| getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection)); |
| |
| if(readPreference.isSlaveOk()){ |
| isSlaveOk = true; |
| LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey); |
| } |
| |
| List<T> list = new ArrayList<T>(); |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection, readPreference); |
| execute(session -> { |
| FindIterable<BasicDBObject> result; |
| if (session != null) { |
| result = dbCollection.find(session, query); |
| } else { |
| result = dbCollection.find(query); |
| } |
| result.sort(BY_ID_ASC); |
| if (limit >= 0) { |
| result.limit(limit); |
| } |
| if (!disableIndexHint && !hasModifiedIdCompoundIndex) { |
| result.hint(hint); |
| } |
| if (maxQueryTime > 0) { |
| // OAK-2614: set maxTime if maxQueryTimeMS > 0 |
| result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); |
| } |
| |
| try (MongoCursor<BasicDBObject> cursor = result.iterator()) { |
| for (int i = 0; i < limit && cursor.hasNext(); i++) { |
| BasicDBObject o = cursor.next(); |
| T doc = convertFromDBObject(collection, o); |
| list.add(doc); |
| } |
| } |
| return null; |
| }); |
| resultSize = list.size(); |
| |
| if (cacheChangesTracker != null) { |
| nodesCache.putNonConflictingDocs(cacheChangesTracker, (List<NodeDocument>) list); |
| } |
| |
| return list; |
| } finally { |
| if (cacheChangesTracker != null) { |
| cacheChangesTracker.close(); |
| } |
| stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey, |
| indexedProperty != null , resultSize, lockTime, isSlaveOk); |
| } |
| } |
| |
| boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { |
| if (maxDeltaForModTimeIdxSecs < 0) { |
| return false; |
| } |
| return (NodeDocument.getModifiedInSecs(getTime()) - modifiedTimeInSecs) <= maxDeltaForModTimeIdxSecs; |
| } |
| |
| @Override |
| public <T extends Document> void remove(Collection<T> collection, String key) { |
| log("remove", key); |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| Stopwatch watch = startWatch(); |
| try { |
| execute(session -> { |
| Bson filter = getByKeyQuery(key); |
| if (session != null) { |
| dbCollection.deleteOne(session, filter); |
| } else { |
| dbCollection.deleteOne(filter); |
| } |
| return null; |
| }); |
| } catch (Exception e) { |
| throw DocumentStoreException.convert(e, "Remove failed for " + key); |
| } finally { |
| invalidateCache(collection, key); |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, 1); |
| } |
| } |
| |
| @Override |
| public <T extends Document> void remove(Collection<T> collection, List<String> keys) { |
| log("remove", keys); |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| Stopwatch watch = startWatch(); |
| try { |
| for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){ |
| Bson query = Filters.in(Document.ID, keyBatch); |
| try { |
| execute(session -> { |
| if (session != null) { |
| dbCollection.deleteMany(session, query); |
| } else { |
| dbCollection.deleteMany(query); |
| } |
| return null; |
| }); |
| } catch (Exception e) { |
| throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch); |
| } finally { |
| if (collection == Collection.NODES) { |
| for (String key : keyBatch) { |
| invalidateCache(collection, key); |
| } |
| } |
| } |
| } |
| } finally { |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, keys.size()); |
| } |
| } |
| |
| @Override |
| public <T extends Document> int remove(Collection<T> collection, Map<String, Long> toRemove) { |
| log("remove", toRemove); |
| int num = 0; |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| Stopwatch watch = startWatch(); |
| try { |
| List<String> batchIds = Lists.newArrayList(); |
| List<Bson> batch = Lists.newArrayList(); |
| Iterator<Entry<String, Long>> it = toRemove.entrySet().iterator(); |
| while (it.hasNext()) { |
| Entry<String, Long> entry = it.next(); |
| Condition c = newEqualsCondition(entry.getValue()); |
| Bson clause = createQueryForUpdate(entry.getKey(), |
| Collections.singletonMap(KEY_MODIFIED, c)); |
| batchIds.add(entry.getKey()); |
| batch.add(clause); |
| if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) { |
| Bson query = Filters.or(batch); |
| try { |
| num += execute(session -> { |
| DeleteResult result; |
| if (session != null) { |
| result = dbCollection.deleteMany(session, query); |
| } else { |
| result = dbCollection.deleteMany(query); |
| } |
| return result.getDeletedCount(); |
| }); |
| } catch (Exception e) { |
| throw DocumentStoreException.convert(e, "Remove failed for " + batch); |
| } finally { |
| if (collection == Collection.NODES) { |
| invalidateCache(batchIds); |
| } |
| } |
| batchIds.clear(); |
| batch.clear(); |
| } |
| } |
| } finally { |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, num); |
| } |
| return num; |
| } |
| |
| @Override |
| public <T extends Document> int remove(Collection<T> collection, |
| String indexedProperty, long startValue, long endValue) |
| throws DocumentStoreException { |
| log("remove", collection, indexedProperty, startValue, endValue); |
| int num = 0; |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| Stopwatch watch = startWatch(); |
| try { |
| Bson query = Filters.and( |
| Filters.gt(indexedProperty, startValue), |
| Filters.lt(indexedProperty, endValue) |
| ); |
| try { |
| num = (int) Math.min(execute((DocumentStoreCallable<Long>) session -> { |
| DeleteResult result; |
| if (session != null) { |
| result = dbCollection.deleteMany(session, query); |
| } else { |
| result = dbCollection.deleteMany(query); |
| } |
| return result.getDeletedCount(); |
| }), Integer.MAX_VALUE); |
| } catch (Exception e) { |
| throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " + |
| indexedProperty + " in (" + startValue + ", " + endValue + ")"); |
| } finally { |
| if (collection == Collection.NODES) { |
| // this method is currently being used only for Journal collection while GC. |
| // But, to keep sanctity of the API, we need to acknowledge that Nodes collection |
| // could've been used. But, in this signature, there's no useful way to invalidate |
| // cache. |
| // So, we use the hammer for this task |
| invalidateCache(); |
| } |
| } |
| } finally { |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, num); |
| } |
| return num; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Nullable |
| private <T extends Document> T findAndModify(Collection<T> collection, |
| UpdateOp updateOp, |
| boolean upsert, |
| boolean checkConditions) { |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| // make sure we don't modify the original updateOp |
| updateOp = updateOp.copy(); |
| Bson update = createUpdate(updateOp, !upsert); |
| |
| Lock lock = null; |
| if (collection == Collection.NODES) { |
| lock = nodeLocks.acquire(updateOp.getId()); |
| } |
| final Stopwatch watch = startWatch(); |
| boolean newEntry = false; |
| try { |
| // get modCount of cached document |
| Long modCount = null; |
| T cachedDoc = null; |
| if (collection == Collection.NODES) { |
| cachedDoc = (T) nodesCache.getIfPresent(updateOp.getId()); |
| if (cachedDoc != null) { |
| modCount = cachedDoc.getModCount(); |
| } |
| } |
| |
| // perform a conditional update with limited result |
| // if we have a matching modCount |
| if (modCount != null) { |
| // only perform the conditional update when there are |
| // no conditions and the check is OK. this avoid an |
| // unnecessary call when the conditions do not match |
| if (!checkConditions || UpdateUtils.checkConditions(cachedDoc, updateOp.getConditions())) { |
| // below condition may overwrite a user supplied condition |
| // on _modCount. This is fine, because the conditions were |
| // already checked against the cached document with the |
| // matching _modCount value. There is no need to check the |
| // user supplied condition on _modCount again on the server |
| Bson query = Filters.and( |
| createQueryForUpdate(updateOp.getId(), updateOp.getConditions()), |
| Filters.eq(Document.MOD_COUNT, modCount) |
| ); |
| |
| UpdateResult result = execute(session -> { |
| if (session != null) { |
| return dbCollection.updateOne(session, query, update); |
| } else { |
| return dbCollection.updateOne(query, update); |
| } |
| }); |
| if (result.getModifiedCount() > 0) { |
| // success, update cached document |
| if (collection == Collection.NODES) { |
| NodeDocument newDoc = (NodeDocument) applyChanges(collection, cachedDoc, updateOp); |
| nodesCache.put(newDoc); |
| } |
| // return previously cached document |
| return cachedDoc; |
| } |
| } |
| } |
| |
| // conditional update failed or not possible |
| // perform operation and get complete document |
| Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions()); |
| FindOneAndUpdateOptions options = new FindOneAndUpdateOptions() |
| .returnDocument(ReturnDocument.BEFORE).upsert(upsert); |
| BasicDBObject oldNode = execute(session -> { |
| if (session != null) { |
| return dbCollection.findOneAndUpdate(session, query, update, options); |
| } else { |
| return dbCollection.findOneAndUpdate(query, update, options); |
| } |
| }); |
| |
| if (oldNode == null && upsert) { |
| newEntry = true; |
| } |
| |
| if (checkConditions && oldNode == null) { |
| return null; |
| } |
| T oldDoc = convertFromDBObject(collection, oldNode); |
| if (oldDoc != null) { |
| if (collection == Collection.NODES) { |
| NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, updateOp); |
| nodesCache.put(newDoc); |
| } |
| oldDoc.seal(); |
| } else if (upsert) { |
| if (collection == Collection.NODES) { |
| NodeDocument doc = (NodeDocument) collection.newDocument(this); |
| UpdateUtils.applyChanges(doc, updateOp); |
| nodesCache.putIfAbsent(doc); |
| } |
| } else { |
| // updateOp without conditions and not an upsert |
| // this means the document does not exist |
| if (collection == Collection.NODES) { |
| nodesCache.invalidate(updateOp.getId()); |
| } |
| } |
| return oldDoc; |
| } catch (Exception e) { |
| throw handleException(e, collection, updateOp.getId()); |
| } finally { |
| if (lock != null) { |
| lock.unlock(); |
| } |
| stats.doneFindAndModify(watch.elapsed(TimeUnit.NANOSECONDS), collection, updateOp.getId(), |
| newEntry, true, 0); |
| } |
| } |
| |
| @Nullable |
| @Override |
| public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update) |
| throws DocumentStoreException { |
| log("createOrUpdate", update); |
| UpdateUtils.assertUnconditional(update); |
| T doc = findAndModify(collection, update, update.isNew(), false); |
| log("createOrUpdate returns ", doc); |
| return doc; |
| } |
| |
| /** |
| * Try to apply all the {@link UpdateOp}s with at least MongoDB requests as |
| * possible. The return value is the list of the old documents (before |
| * applying changes). The mechanism is as follows: |
| * |
| * <ol> |
| * <li>For each UpdateOp try to read the assigned document from the cache. |
| * Add them to {@code oldDocs}.</li> |
| * <li>Prepare a list of all UpdateOps that doesn't have their documents and |
| * read them in one find() call. Add results to {@code oldDocs}.</li> |
| * <li>Prepare a bulk update. For each remaining UpdateOp add following |
| * operation: |
| * <ul> |
| * <li>Find document with the same id and the same mod_count as in the |
| * {@code oldDocs}.</li> |
| * <li>Apply changes from the UpdateOps.</li> |
| * </ul> |
| * </li> |
| * <li>Execute the bulk update.</li> |
| * </ol> |
| * |
| * If some other process modifies the target documents between points 2 and |
| * 3, the mod_count will be increased as well and the bulk update will fail |
| * for the concurrently modified docs. The method will then remove the |
| * failed documents from the {@code oldDocs} and restart the process from |
| * point 2. It will stop after 3rd iteration. |
| */ |
| @SuppressWarnings("unchecked") |
| @Nullable |
| @Override |
| public <T extends Document> List<T> createOrUpdate(Collection<T> collection, |
| List<UpdateOp> updateOps) { |
| log("createOrUpdate", updateOps); |
| |
| Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>(); |
| List<UpdateOp> duplicates = new ArrayList<UpdateOp>(); |
| Map<UpdateOp, T> results = new LinkedHashMap<UpdateOp, T>(); |
| |
| final Stopwatch watch = startWatch(); |
| try { |
| for (UpdateOp updateOp : updateOps) { |
| UpdateUtils.assertUnconditional(updateOp); |
| UpdateOp clone = updateOp.copy(); |
| if (operationsToCover.containsKey(updateOp.getId())) { |
| duplicates.add(clone); |
| } else { |
| operationsToCover.put(updateOp.getId(), clone); |
| } |
| results.put(clone, null); |
| } |
| |
| Map<String, T> oldDocs = new HashMap<String, T>(); |
| if (collection == Collection.NODES) { |
| oldDocs.putAll((Map<String, T>) getCachedNodes(operationsToCover.keySet())); |
| } |
| |
| for (int i = 0; i <= bulkRetries; i++) { |
| if (operationsToCover.size() <= 2) { |
| // bulkUpdate() method invokes Mongo twice, so sending 2 updates |
| // in bulk mode wouldn't result in any performance gain |
| break; |
| } |
| for (List<UpdateOp> partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) { |
| Map<UpdateOp, T> successfulUpdates = bulkUpdate(collection, partition, oldDocs); |
| results.putAll(successfulUpdates); |
| operationsToCover.values().removeAll(successfulUpdates.keySet()); |
| } |
| } |
| |
| // if there are some changes left, we'll apply them one after another |
| Iterator<UpdateOp> it = Iterators.concat(operationsToCover.values().iterator(), duplicates.iterator()); |
| while (it.hasNext()) { |
| UpdateOp op = it.next(); |
| it.remove(); |
| T oldDoc = createOrUpdate(collection, op); |
| if (oldDoc != null) { |
| results.put(op, oldDoc); |
| } |
| } |
| } catch (MongoException e) { |
| throw handleException(e, collection, Iterables.transform(updateOps, |
| new Function<UpdateOp, String>() { |
| @Override |
| public String apply(UpdateOp input) { |
| return input.getId(); |
| } |
| })); |
| } finally { |
| stats.doneCreateOrUpdate(watch.elapsed(TimeUnit.NANOSECONDS), |
| collection, Lists.transform(updateOps, new Function<UpdateOp, String>() { |
| @Override |
| public String apply(UpdateOp input) { |
| return input.getId(); |
| } |
| })); |
| } |
| List<T> resultList = new ArrayList<T>(results.values()); |
| log("createOrUpdate returns", resultList); |
| return resultList; |
| } |
| |
| private Map<String, NodeDocument> getCachedNodes(Set<String> keys) { |
| Map<String, NodeDocument> nodes = new HashMap<String, NodeDocument>(); |
| for (String key : keys) { |
| NodeDocument cached = nodesCache.getIfPresent(key); |
| if (cached != null) { |
| nodes.put(key, cached); |
| } |
| } |
| return nodes; |
| } |
| |
| private <T extends Document> Map<UpdateOp, T> bulkUpdate(Collection<T> collection, |
| List<UpdateOp> updateOperations, |
| Map<String, T> oldDocs) { |
| Map<String, UpdateOp> bulkOperations = createMap(updateOperations); |
| Set<String> lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet()); |
| oldDocs.putAll(findDocuments(collection, lackingDocs)); |
| |
| CacheChangesTracker tracker = null; |
| if (collection == Collection.NODES) { |
| tracker = nodesCache.registerTracker(bulkOperations.keySet()); |
| } |
| |
| try { |
| BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs); |
| |
| if (collection == Collection.NODES) { |
| List<NodeDocument> docsToCache = new ArrayList<NodeDocument>(); |
| for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) { |
| NodeDocument doc = Collection.NODES.newDocument(this); |
| UpdateUtils.applyChanges(doc, op); |
| docsToCache.add(doc); |
| } |
| |
| for (String key : difference(bulkOperations.keySet(), bulkResult.failedUpdates)) { |
| T oldDoc = oldDocs.get(key); |
| if (oldDoc != null && oldDoc != NodeDocument.NULL) { |
| NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, bulkOperations.get(key)); |
| docsToCache.add(newDoc); |
| } |
| } |
| |
| nodesCache.putNonConflictingDocs(tracker, docsToCache); |
| } |
| oldDocs.keySet().removeAll(bulkResult.failedUpdates); |
| |
| Map<UpdateOp, T> result = new HashMap<UpdateOp, T>(); |
| for (Entry<String, UpdateOp> entry : bulkOperations.entrySet()) { |
| if (bulkResult.failedUpdates.contains(entry.getKey())) { |
| continue; |
| } else if (bulkResult.upserts.contains(entry.getKey())) { |
| result.put(entry.getValue(), null); |
| } else { |
| result.put(entry.getValue(), oldDocs.get(entry.getKey())); |
| } |
| } |
| return result; |
| } finally { |
| if (tracker != null) { |
| tracker.close(); |
| } |
| } |
| } |
| |
| private static Map<String, UpdateOp> createMap(List<UpdateOp> updateOps) { |
| return Maps.uniqueIndex(updateOps, new Function<UpdateOp, String>() { |
| @Override |
| public String apply(UpdateOp input) { |
| return input.getId(); |
| } |
| }); |
| } |
| |
| private <T extends Document> Map<String, T> findDocuments(Collection<T> collection, Set<String> keys) { |
| Map<String, T> docs = new HashMap<String, T>(); |
| if (!keys.isEmpty()) { |
| List<Bson> conditions = new ArrayList<>(keys.size()); |
| for (String key : keys) { |
| conditions.add(getByKeyQuery(key)); |
| } |
| MongoCollection<BasicDBObject> dbCollection; |
| if (secondariesWithinAcceptableLag()) { |
| dbCollection = getDBCollection(collection); |
| } else { |
| lagTooHigh(); |
| dbCollection = getDBCollection(collection).withReadPreference(ReadPreference.primary()); |
| } |
| execute(session -> { |
| FindIterable<BasicDBObject> cursor; |
| if (session != null) { |
| cursor = dbCollection.find(session, Filters.or(conditions)); |
| } else { |
| cursor = dbCollection.find(Filters.or(conditions)); |
| } |
| for (BasicDBObject doc : cursor) { |
| T foundDoc = convertFromDBObject(collection, doc); |
| docs.put(foundDoc.getId(), foundDoc); |
| } |
| return null; |
| }); |
| } |
| return docs; |
| } |
| |
| private <T extends Document> BulkUpdateResult sendBulkUpdate(Collection<T> collection, |
| java.util.Collection<UpdateOp> updateOps, Map<String, T> oldDocs) { |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| List<WriteModel<BasicDBObject>> writes = new ArrayList<>(updateOps.size()); |
| String[] bulkIds = new String[updateOps.size()]; |
| int i = 0; |
| for (UpdateOp updateOp : updateOps) { |
| String id = updateOp.getId(); |
| Bson query = createQueryForUpdate(id, updateOp.getConditions()); |
| // fail on insert when isNew == false |
| boolean failInsert = !updateOp.isNew(); |
| T oldDoc = oldDocs.get(id); |
| if (oldDoc == null || oldDoc == NodeDocument.NULL) { |
| query = Filters.and(query, Filters.exists(Document.MOD_COUNT, false)); |
| } else { |
| query = Filters.and(query, Filters.eq(Document.MOD_COUNT, oldDoc.getModCount())); |
| } |
| writes.add(new UpdateOneModel<>(query, |
| createUpdate(updateOp, failInsert), |
| new UpdateOptions().upsert(true)) |
| ); |
| bulkIds[i++] = id; |
| } |
| |
| BulkWriteResult bulkResult; |
| Set<String> failedUpdates = new HashSet<String>(); |
| Set<String> upserts = new HashSet<String>(); |
| BulkWriteOptions options = new BulkWriteOptions().ordered(false); |
| try { |
| bulkResult = execute(session -> { |
| if (session != null) { |
| return dbCollection.bulkWrite(session, writes, options); |
| } else { |
| return dbCollection.bulkWrite(writes, options); |
| } |
| }); |
| } catch (MongoBulkWriteException e) { |
| bulkResult = e.getWriteResult(); |
| for (BulkWriteError err : e.getWriteErrors()) { |
| failedUpdates.add(bulkIds[err.getIndex()]); |
| } |
| } |
| for (BulkWriteUpsert upsert : bulkResult.getUpserts()) { |
| upserts.add(bulkIds[upsert.getIndex()]); |
| } |
| return new BulkUpdateResult(failedUpdates, upserts); |
| } |
| |
| @Override |
| public <T extends Document> T findAndUpdate(Collection<T> collection, UpdateOp update) |
| throws DocumentStoreException { |
| log("findAndUpdate", update); |
| T doc = findAndModify(collection, update, false, true); |
| log("findAndUpdate returns ", doc); |
| return doc; |
| } |
| |
| @Override |
| public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp> updateOps) { |
| log("create", updateOps); |
| List<T> docs = new ArrayList<T>(); |
| List<BasicDBObject> inserts = new ArrayList<>(updateOps.size()); |
| List<String> ids = new ArrayList<>(updateOps.size()); |
| |
| for (UpdateOp update : updateOps) { |
| BasicDBObject doc = new BasicDBObject(); |
| inserts.add(doc); |
| doc.put(Document.ID, update.getId()); |
| UpdateUtils.assertUnconditional(update); |
| T target = collection.newDocument(this); |
| UpdateUtils.applyChanges(target, update); |
| docs.add(target); |
| ids.add(update.getId()); |
| for (Entry<Key, Operation> entry : update.getChanges().entrySet()) { |
| Key k = entry.getKey(); |
| Operation op = entry.getValue(); |
| switch (op.type) { |
| case SET: |
| case MAX: |
| case INCREMENT: { |
| doc.put(k.toString(), op.value); |
| break; |
| } |
| case SET_MAP_ENTRY: { |
| Revision r = k.getRevision(); |
| if (r == null) { |
| throw new IllegalStateException("SET_MAP_ENTRY must not have null revision"); |
| } |
| BasicDBObject value = (BasicDBObject) doc.get(k.getName()); |
| if (value == null) { |
| value = new BasicDBObject(); |
| doc.put(k.getName(), value); |
| } |
| value.put(r.toString(), op.value); |
| break; |
| } |
| case REMOVE: |
| case REMOVE_MAP_ENTRY: |
| // nothing to do for new entries |
| break; |
| } |
| } |
| if (!doc.containsField(Document.MOD_COUNT)) { |
| doc.put(Document.MOD_COUNT, 1L); |
| target.put(Document.MOD_COUNT, 1L); |
| } |
| } |
| |
| MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); |
| final Stopwatch watch = startWatch(); |
| boolean insertSuccess = false; |
| try { |
| try { |
| execute(session -> { |
| if (session != null) { |
| dbCollection.insertMany(session, inserts); |
| } else { |
| dbCollection.insertMany(inserts); |
| } |
| return null; |
| }); |
| if (collection == Collection.NODES) { |
| for (T doc : docs) { |
| nodesCache.putIfAbsent((NodeDocument) doc); |
| } |
| } |
| insertSuccess = true; |
| return true; |
| } catch (MongoException e) { |
| return false; |
| } |
| } finally { |
| stats.doneCreate(watch.elapsed(TimeUnit.NANOSECONDS), collection, ids, insertSuccess); |
| } |
| } |
| |
| /** |
| * Returns the {@link Document#MOD_COUNT} and |
| * {@link NodeDocument#MODIFIED_IN_SECS} values of the documents with the |
| * given {@code keys}. The returned map will only contain entries for |
| * existing documents. The default value is -1 if the document does not have |
| * a modCount field. The same applies to the modified field. |
| * |
| * @param keys the keys of the documents. |
| * @return map with key to modification stamp mapping. |
| * @throws MongoException if the call fails |
| */ |
| @NotNull |
| private Map<String, ModificationStamp> getModStamps(Iterable<String> keys) |
| throws MongoException { |
| // Fetch only the modCount and id |
| final BasicDBObject fields = new BasicDBObject(Document.ID, 1); |
| fields.put(Document.MOD_COUNT, 1); |
| fields.put(NodeDocument.MODIFIED_IN_SECS, 1); |
| |
| Map<String, ModificationStamp> modCounts = Maps.newHashMap(); |
| |
| nodes.withReadPreference(ReadPreference.primary()) |
| .find(Filters.in(Document.ID, keys)).projection(fields) |
| .forEach((Block<BasicDBObject>) obj -> { |
| String id = (String) obj.get(Document.ID); |
| Long modCount = Utils.asLong((Number) obj.get(Document.MOD_COUNT)); |
| if (modCount == null) { |
| modCount = -1L; |
| } |
| Long modified = Utils.asLong((Number) obj.get(NodeDocument.MODIFIED_IN_SECS)); |
| if (modified == null) { |
| modified = -1L; |
| } |
| modCounts.put(id, new ModificationStamp(modCount, modified)); |
| }); |
| return modCounts; |
| } |
| |
| DocumentReadPreference getReadPreference(int maxCacheAge) { |
| if (withClientSession()) { |
| return DocumentReadPreference.PREFER_SECONDARY; |
| } else if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) { |
| return DocumentReadPreference.PRIMARY; |
| } else if(maxCacheAge == Integer.MAX_VALUE){ |
| return DocumentReadPreference.PREFER_SECONDARY; |
| } else { |
| return DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH; |
| } |
| } |
| |
| DocumentReadPreference getDefaultReadPreference(Collection col) { |
| DocumentReadPreference preference = DocumentReadPreference.PRIMARY; |
| if (withClientSession()) { |
| preference = DocumentReadPreference.PREFER_SECONDARY; |
| } else if (col == Collection.NODES) { |
| preference = DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH; |
| } |
| return preference; |
| } |
| |
| <T extends Document> ReadPreference getMongoReadPreference(@NotNull Collection<T> collection, |
| @Nullable String parentId, |
| @NotNull DocumentReadPreference preference) { |
| switch(preference){ |
| case PRIMARY: |
| return ReadPreference.primary(); |
| case PREFER_PRIMARY : |
| return ReadPreference.primaryPreferred(); |
| case PREFER_SECONDARY : |
| if (!withClientSession() || secondariesWithinAcceptableLag()) { |
| return getConfiguredReadPreference(collection); |
| } else { |
| lagTooHigh(); |
| return ReadPreference.primary(); |
| } |
| case PREFER_SECONDARY_IF_OLD_ENOUGH: |
| if(collection != Collection.NODES){ |
| return ReadPreference.primary(); |
| } |
| |
| boolean secondarySafe; |
| if (withClientSession() && secondariesWithinAcceptableLag()) { |
| secondarySafe = true; |
| } else { |
| // This is not quite accurate, because ancestors |
| // are updated in a background thread (_lastRev). We |
| // will need to revise this for low maxReplicationLagMillis |
| // values |
| long replicationSafeLimit = getTime() - maxReplicationLagMillis; |
| |
| if (parentId == null) { |
| secondarySafe = false; |
| } else { |
| //If parent has been modified loooong time back then there children |
| //would also have not be modified. In that case we can read from secondary |
| NodeDocument cachedDoc = getIfCached(Collection.NODES, parentId); |
| secondarySafe = cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit); |
| } |
| } |
| |
| ReadPreference readPreference; |
| if (secondarySafe) { |
| readPreference = getConfiguredReadPreference(collection); |
| } else { |
| readPreference = ReadPreference.primary(); |
| } |
| |
| return readPreference; |
| default: |
| throw new IllegalArgumentException("Unsupported usage " + preference); |
| } |
| } |
| |
| /** |
| * Retrieves the ReadPreference specified for the Mongo DB in use irrespective of |
| * DBCollection. Depending on deployments the user can tweak the default references |
| * to read from secondary and in that also tag secondaries |
| * |
| * @return db level ReadPreference |
| */ |
| <T extends Document> ReadPreference getConfiguredReadPreference(Collection<T> collection){ |
| return getDBCollection(collection).getReadPreference(); |
| } |
| |
| @Nullable |
| protected <T extends Document> T convertFromDBObject(@NotNull Collection<T> collection, |
| @Nullable DBObject n) { |
| T copy = null; |
| if (n != null) { |
| copy = collection.newDocument(this); |
| for (String key : n.keySet()) { |
| Object o = n.get(key); |
| if (o instanceof String) { |
| copy.put(key, o); |
| } else if (o instanceof Number && |
| (NodeDocument.MODIFIED_IN_SECS.equals(key) || Document.MOD_COUNT.equals(key))) { |
| copy.put(key, Utils.asLong((Number) o)); |
| } else if (o instanceof Long) { |
| copy.put(key, o); |
| } else if (o instanceof Integer) { |
| copy.put(key, o); |
| } else if (o instanceof Boolean) { |
| copy.put(key, o); |
| } else if (o instanceof BasicDBObject) { |
| copy.put(key, convertMongoMap((BasicDBObject) o)); |
| } |
| } |
| } |
| return copy; |
| } |
| |
| @NotNull |
| private Map<Revision, Object> convertMongoMap(@NotNull BasicDBObject obj) { |
| Map<Revision, Object> map = new TreeMap<Revision, Object>(StableRevisionComparator.REVERSE); |
| for (Map.Entry<String, Object> entry : obj.entrySet()) { |
| map.put(Revision.fromString(entry.getKey()), entry.getValue()); |
| } |
| return map; |
| } |
| |
| <T extends Document> MongoCollection<BasicDBObject> getDBCollection(Collection<T> collection) { |
| if (collection == Collection.NODES) { |
| return nodes; |
| } else if (collection == Collection.CLUSTER_NODES) { |
| return clusterNodes; |
| } else if (collection == Collection.SETTINGS) { |
| return settings; |
| } else if (collection == Collection.JOURNAL) { |
| return journal; |
| } else { |
| throw new IllegalArgumentException( |
| "Unknown collection: " + collection.toString()); |
| } |
| } |
| |
| <T extends Document> MongoCollection<BasicDBObject> getDBCollection(Collection<T> collection, |
| ReadPreference readPreference) { |
| return getDBCollection(collection).withReadPreference(readPreference); |
| } |
| |
| MongoDatabase getDatabase() { |
| return db; |
| } |
| |
| MongoClient getClient() { |
| return client; |
| } |
| |
| private static Bson getByKeyQuery(String key) { |
| return Filters.eq(Document.ID, key); |
| } |
| |
| @Override |
| public void dispose() { |
| client.close(); |
| try { |
| nodesCache.close(); |
| } catch (IOException e) { |
| LOG.warn("Error occurred while closing nodes cache", e); |
| } |
| } |
| |
| @Override |
| public Iterable<CacheStats> getCacheStats() { |
| return nodesCache.getCacheStats(); |
| } |
| |
| @Override |
| public Map<String, String> getMetadata() { |
| return metadata; |
| } |
| |
| @NotNull |
| @Override |
| public Map<String, String> getStats() { |
| ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); |
| List<MongoCollection<?>> all = ImmutableList.of(nodes, clusterNodes, settings, journal); |
| all.forEach(c -> toMapBuilder(builder, |
| db.runCommand( |
| new BasicDBObject("collStats", c.getNamespace().getCollectionName()), |
| BasicDBObject.class), |
| c.getNamespace().getCollectionName())); |
| return builder.build(); |
| } |
| |
| long getMaxDeltaForModTimeIdxSecs() { |
| return maxDeltaForModTimeIdxSecs; |
| } |
| |
| boolean getDisableIndexHint() { |
| return disableIndexHint; |
| } |
| |
| private static void log(String message, Object... args) { |
| if (LOG.isDebugEnabled()) { |
| String argList = Arrays.toString(args); |
| if (argList.length() > 10000) { |
| argList = argList.length() + ": " + argList; |
| } |
| LOG.debug(message + argList); |
| } |
| } |
| |
| @Override |
| public <T extends Document> T getIfCached(Collection<T> collection, String key) { |
| if (collection != Collection.NODES) { |
| return null; |
| } |
| @SuppressWarnings("unchecked") |
| T doc = (T) nodesCache.getIfPresent(key); |
| if (doc == NodeDocument.NULL) { |
| doc = null; |
| } |
| return doc; |
| } |
| |
| @NotNull |
| private static Bson createQueryForUpdate(String key, |
| Map<Key, Condition> conditions) { |
| Bson query = getByKeyQuery(key); |
| if (conditions.isEmpty()) { |
| // special case when there are no conditions |
| return query; |
| } |
| List<Bson> conditionList = new ArrayList<>(conditions.size() + 1); |
| conditionList.add(query); |
| for (Entry<Key, Condition> entry : conditions.entrySet()) { |
| Key k = entry.getKey(); |
| Condition c = entry.getValue(); |
| switch (c.type) { |
| case EXISTS: |
| conditionList.add(Filters.exists(k.toString(), Boolean.TRUE.equals(c.value))); |
| break; |
| case EQUALS: |
| conditionList.add(Filters.eq(k.toString(), c.value)); |
| break; |
| case NOTEQUALS: |
| conditionList.add(Filters.ne(k.toString(), c.value)); |
| break; |
| } |
| } |
| return Filters.and(conditionList); |
| } |
| |
| /** |
| * Creates a MongoDB update object from the given UpdateOp. |
| * |
| * @param updateOp the update op. |
| * @param failOnInsert whether to create an update that will fail on insert. |
| * @return the DBObject. |
| */ |
| private static BasicDBObject createUpdate(UpdateOp updateOp, |
| boolean failOnInsert) { |
| BasicDBObject setUpdates = new BasicDBObject(); |
| BasicDBObject maxUpdates = new BasicDBObject(); |
| BasicDBObject incUpdates = new BasicDBObject(); |
| BasicDBObject unsetUpdates = new BasicDBObject(); |
| |
| // always increment modCount |
| updateOp.increment(Document.MOD_COUNT, 1); |
| |
| // other updates |
| for (Entry<Key, Operation> entry : updateOp.getChanges().entrySet()) { |
| Key k = entry.getKey(); |
| Operation op = entry.getValue(); |
| switch (op.type) { |
| case SET: |
| case SET_MAP_ENTRY: { |
| setUpdates.append(k.toString(), op.value); |
| break; |
| } |
| case MAX: { |
| maxUpdates.append(k.toString(), op.value); |
| break; |
| } |
| case INCREMENT: { |
| incUpdates.append(k.toString(), op.value); |
| break; |
| } |
| case REMOVE: |
| case REMOVE_MAP_ENTRY: { |
| unsetUpdates.append(k.toString(), "1"); |
| break; |
| } |
| } |
| } |
| |
| BasicDBObject update = new BasicDBObject(); |
| if (!setUpdates.isEmpty()) { |
| update.append("$set", setUpdates); |
| } |
| if (!maxUpdates.isEmpty()) { |
| update.append("$max", maxUpdates); |
| } |
| if (!incUpdates.isEmpty()) { |
| update.append("$inc", incUpdates); |
| } |
| if (!unsetUpdates.isEmpty()) { |
| update.append("$unset", unsetUpdates); |
| } |
| |
| if (failOnInsert) { |
| update.putAll(CONFLICT_ON_INSERT); |
| } |
| |
| return update; |
| } |
| |
| @NotNull |
| private <T extends Document> T applyChanges(Collection<T> collection, T oldDoc, UpdateOp update) { |
| T doc = collection.newDocument(this); |
| oldDoc.deepCopy(doc); |
| UpdateUtils.applyChanges(doc, update); |
| doc.seal(); |
| return doc; |
| } |
| |
| private Stopwatch startWatch() { |
| return Stopwatch.createStarted(); |
| } |
| |
| |
| @Override |
| public void setReadWriteMode(String readWriteMode) { |
| if (readWriteMode == null || readWriteMode.equals(lastReadWriteMode)) { |
| return; |
| } |
| lastReadWriteMode = readWriteMode; |
| try { |
| String rwModeUri = readWriteMode; |
| if(!readWriteMode.startsWith("mongodb://")){ |
| rwModeUri = String.format("mongodb://localhost/?%s", readWriteMode); |
| } |
| MongoClientURI uri = new MongoClientURI(rwModeUri); |
| ReadPreference readPref = uri.getOptions().getReadPreference(); |
| |
| if (!readPref.equals(nodes.getReadPreference())) { |
| nodes = nodes.withReadPreference(readPref); |
| LOG.info("Using ReadPreference {} ", readPref); |
| } |
| |
| WriteConcern writeConcern = uri.getOptions().getWriteConcern(); |
| if (!writeConcern.equals(nodes.getWriteConcern())) { |
| nodes = nodes.withWriteConcern(writeConcern); |
| LOG.info("Using WriteConcern " + writeConcern); |
| } |
| } catch (Exception e) { |
| LOG.error("Error setting readWriteMode " + readWriteMode, e); |
| } |
| } |
| |
| private long getTime() { |
| return clock.getTime(); |
| } |
| |
| void setClock(Clock clock) { |
| this.clock = clock; |
| } |
| |
| NodeDocumentCache getNodeDocumentCache() { |
| return nodesCache; |
| } |
| |
| public void setStatsCollector(DocumentStoreStatsCollector stats) { |
| this.stats = stats; |
| } |
| |
| @Override |
| public long determineServerTimeDifferenceMillis() { |
| // the assumption is that the network delay from this instance |
| // to the server, and from the server back to this instance |
| // are (more or less) equal. |
| // taking this assumption into account allows to remove |
| // the network delays from the picture: the difference |
| // between end and start time is exactly this network |
| // delay (plus some server time, but that's neglected). |
| // so if the clocks are in perfect sync and the above |
| // mentioned assumption holds, then the server time should |
| // be exactly at the midPoint between start and end. |
| // this should allow a more accurate picture of the diff. |
| final long start = System.currentTimeMillis(); |
| // assumption here: server returns UTC - ie the returned |
| // date object is correctly taking care of time zones. |
| final BasicDBObject isMaster; |
| try { |
| isMaster = db.runCommand(new BasicDBObject("isMaster", 1), BasicDBObject.class); |
| if (isMaster == null) { |
| // OAK-4107 / OAK-4515 : extra safety |
| LOG.warn("determineServerTimeDifferenceMillis: db.isMaster returned null - cannot determine time difference - assuming 0ms."); |
| return 0; |
| } |
| final Date serverLocalTime = isMaster.getDate("localTime"); |
| if (serverLocalTime == null) { |
| // OAK-4107 / OAK-4515 : looks like this can happen - at least |
| // has been seen once on mongo 3.0.9 |
| // let's handle this gently and issue a log.warn |
| // instead of throwing a NPE |
| LOG.warn("determineServerTimeDifferenceMillis: db.isMaster.localTime returned null - cannot determine time difference - assuming 0ms."); |
| return 0; |
| } |
| final long end = System.currentTimeMillis(); |
| |
| final long midPoint = (start + end) / 2; |
| final long serverLocalTimeMillis = serverLocalTime.getTime(); |
| |
| // the difference should be |
| // * positive when local instance is ahead |
| // * and negative when the local instance is behind |
| final long diff = midPoint - serverLocalTimeMillis; |
| |
| return diff; |
| } catch (Exception e) { |
| LOG.warn("determineServerTimeDifferenceMillis: db.isMaster failed with exception - assuming 0ms. " |
| + "(Result details: server exception=" + e + ", server error message=" + e.getMessage() + ")", e); |
| } |
| return 0; |
| } |
| |
| private <T extends Document> DocumentStoreException handleException(Throwable ex, |
| Collection<T> collection, |
| Iterable<String> ids) { |
| if (collection == Collection.NODES) { |
| for (String id : ids) { |
| invalidateCache(collection, id); |
| } |
| } |
| return asDocumentStoreException(ex.getMessage(), ex, |
| getDocumentStoreExceptionTypeFor(ex), ids); |
| } |
| |
| private <T extends Document> DocumentStoreException handleException(Throwable ex, |
| Collection<T> collection, |
| String id) { |
| return handleException(ex, collection, Collections.singleton(id)); |
| } |
| |
| private static void toMapBuilder(ImmutableMap.Builder<String, String> builder, |
| BasicDBObject stats, |
| String prefix) { |
| stats.forEach((k, v) -> { |
| // exclude some verbose internals and status |
| if (!k.equals("wiredTiger") && !k.equals("indexDetails") && !k.equals("ok")) { |
| String key = prefix + "." + k; |
| if (v instanceof BasicDBObject) { |
| toMapBuilder(builder, (BasicDBObject) v, key); |
| } else { |
| builder.put(key, String.valueOf(v)); |
| } |
| } |
| }); |
| } |
| |
| private boolean withClientSession() { |
| return status.isClientSessionSupported() && useClientSession; |
| } |
| |
| private boolean secondariesWithinAcceptableLag() { |
| return client.getReplicaSetStatus() == null |
| || status.getReplicaSetLagEstimate() < acceptableLagMillis; |
| } |
| |
| private void lagTooHigh() { |
| LOG.debug("Read from secondary is preferred but replication lag is too high. Directing read to primary."); |
| } |
| |
| /** |
| * Execute a callable with an optional {@link ClientSession}. A client |
| * session is passed to {@link DocumentStoreCallable#call(ClientSession)} if |
| * the connected MongoDB servers support client sessions, otherwise the |
| * session is {@code null}. The client session must only be used within |
| * the scope of the {@link DocumentStoreCallable#call(ClientSession)}. |
| * |
| * @param callable the callable. |
| * @param <T> the return type of the callable. |
| * @return the result of the callable. |
| * @throws DocumentStoreException if the callable throws an exception. |
| */ |
| private <T> T execute(DocumentStoreCallable<T> callable) |
| throws DocumentStoreException { |
| T result; |
| if (withClientSession()) { |
| try (ClientSession session = sessionFactory.createClientSession()) { |
| result = callable.call(session); |
| } |
| } else { |
| result = callable.call(null); |
| } |
| return result; |
| } |
| |
| interface DocumentStoreCallable<T> { |
| |
| T call(@Nullable ClientSession session) throws DocumentStoreException; |
| } |
| |
| private static class BulkUpdateResult { |
| |
| private final Set<String> failedUpdates; |
| |
| private final Set<String> upserts; |
| |
| private BulkUpdateResult(Set<String> failedUpdates, Set<String> upserts) { |
| this.failedUpdates = failedUpdates; |
| this.upserts = upserts; |
| } |
| } |
| |
| private static class InvalidationResult implements CacheInvalidationStats { |
| int invalidationCount; |
| int upToDateCount; |
| int cacheSize; |
| int queryCount; |
| int cacheEntriesProcessedCount; |
| |
| @Override |
| public String toString() { |
| return "InvalidationResult{" + |
| "invalidationCount=" + invalidationCount + |
| ", upToDateCount=" + upToDateCount + |
| ", cacheSize=" + cacheSize + |
| ", queryCount=" + queryCount + |
| ", cacheEntriesProcessedCount=" + cacheEntriesProcessedCount + |
| '}'; |
| } |
| |
| @Override |
| public String summaryReport() { |
| return toString(); |
| } |
| } |
| } |