blob: a19c55de7142c17869cf1867e9e671431fe5491b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder.MANY_CHILDREN_THRESHOLD;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
import static org.apache.jackrabbit.oak.plugins.document.Path.ROOT;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.alignWithExternalRevisions;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getModuleVersion;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId;
import static org.apache.jackrabbit.oak.spi.observation.ChangeSet.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jcr.PropertyType;
import javax.management.NotCompliantMBeanException;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
import org.apache.jackrabbit.oak.plugins.document.Branch.BranchCommit;
import org.apache.jackrabbit.oak.plugins.document.bundlor.BundledDocumentDiffer;
import org.apache.jackrabbit.oak.plugins.document.bundlor.BundlingConfigHandler;
import org.apache.jackrabbit.oak.plugins.document.bundlor.DocumentBundlor;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
import org.apache.jackrabbit.oak.plugins.document.util.ReadOnlyDocumentStoreWrapperFactory;
import org.apache.jackrabbit.oak.plugins.document.util.SystemPropertySupplier;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.commons.json.JsopStream;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.json.BlobSerializer;
import org.apache.jackrabbit.oak.plugins.document.util.LeaseCheckDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.document.util.LoggingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.observation.ChangeSet;
import org.apache.jackrabbit.oak.spi.observation.ChangeSetBuilder;
import org.apache.jackrabbit.oak.spi.state.AbstractNodeState;
import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of a NodeStore on {@link DocumentStore}.
*/
public final class DocumentNodeStore
implements NodeStore, RevisionContext, Observable, Clusterable, NodeStateDiffer {
private static final Logger LOG = LoggerFactory.getLogger(DocumentNodeStore.class);
private static final PerfLogger PERFLOG = new PerfLogger(
LoggerFactory.getLogger(DocumentNodeStore.class.getName() + ".perf"));
/**
* Number of milliseconds in one minute.
*/
private static final long ONE_MINUTE_MS = TimeUnit.MINUTES.toMillis(1);
public static final FormatVersion VERSION = FormatVersion.V1_8;
/**
* List of meta properties which are created by DocumentNodeStore and which needs to be
* retained in any cloned copy of DocumentNodeState.
*/
public static final List<String> META_PROP_NAMES = ImmutableList.of(
DocumentBundlor.META_PROP_PATTERN,
DocumentBundlor.META_PROP_BUNDLING_PATH,
DocumentBundlor.META_PROP_NON_BUNDLED_CHILD,
DocumentBundlor.META_PROP_BUNDLED_CHILD
);
/**
* Enable fast diff operations.
*/
private static final boolean FAST_DIFF = SystemPropertySupplier.create("oak.documentMK.fastDiff", Boolean.TRUE).loggingTo(LOG)
.get();
/**
* Feature flag to enable concurrent add/remove operations of hidden empty
* nodes. See OAK-2673.
*/
private boolean enableConcurrentAddRemove = SystemPropertySupplier.create("oak.enableConcurrentAddRemove", Boolean.FALSE)
.loggingTo(LOG).get();
/**
* Use fair mode for background operation lock.
*/
private boolean fairBackgroundOperationLock = SystemPropertySupplier.create("oak.fairBackgroundOperationLock", Boolean.TRUE)
.loggingTo(LOG).get();
public static final String SYS_PROP_DISABLE_JOURNAL = "oak.disableJournalDiff";
/**
* Feature flag to disable the journal diff mechanism. See OAK-4528.
*/
private boolean disableJournalDiff = SystemPropertySupplier.create(SYS_PROP_DISABLE_JOURNAL, Boolean.FALSE).loggingTo(LOG)
.get();
/**
* Threshold for number of paths in journal entry to require a force push during commit
* (instead of at background write)
*/
private int journalPushThreshold = SystemPropertySupplier.create("oak.journalPushThreshold", 100000).loggingTo(LOG).get();
/**
* How many collision entries to collect in a single call.
*/
private int collisionGarbageBatchSize = SystemPropertySupplier.create("oak.documentMK.collisionGarbageBatchSize", 1000)
.loggingTo(LOG).get();
/**
* The number of updates to batch with a single call to
* {@link DocumentStore#createOrUpdate(Collection, List)}.
*/
private final int createOrUpdateBatchSize = SystemPropertySupplier.create("oak.documentMK.createOrUpdateBatchSize", 1000)
.loggingTo(LOG).get();
public static final String SYS_PROP_DISABLE_SWEEP2 = "oak.documentMK.disableSweep2";
private boolean disableSweep2 = SystemPropertySupplier.create(SYS_PROP_DISABLE_SWEEP2, Boolean.FALSE).loggingTo(LOG)
.get();
/**
* The document store without potentially lease checking wrapper.
*/
private final DocumentStore nonLeaseCheckingStore;
/**
* The document store (might be used by multiple node stores).
*/
private final DocumentStore store;
/**
* Marker node, indicating a node does not exist at a given revision.
*/
private final DocumentNodeState missing;
/**
* The commit queue to coordinate the commits.
*/
protected final CommitQueue commitQueue;
/**
* Commit queue for batch updates.
*/
private final BatchCommitQueue batchCommitQueue;
/**
* The change dispatcher for this node store.
*/
private final ChangeDispatcher dispatcher;
/**
* The delay for asynchronous operations (delayed commit propagation and
* cache update).
*/
private int asyncDelay = 1000;
/**
* The maximum back off time in milliseconds when merges are retried. The
* default value is twice the {@link #asyncDelay}.
*/
private int maxBackOffMillis = SystemPropertySupplier.create("oak.maxBackOffMS", asyncDelay * 2).loggingTo(LOG).get();
private int changeSetMaxItems = SystemPropertySupplier.create("oak.document.changeSet.maxItems", 50).loggingTo(LOG).get();
private int changeSetMaxDepth = SystemPropertySupplier.create("oak.document.changeSet.maxDepth", 9).loggingTo(LOG).get();
/**
* Whether this instance is disposed.
*/
private final AtomicBoolean isDisposed = new AtomicBoolean();
/**
* Whether the lease update thread shall be stopped.
*/
private final AtomicBoolean stopLeaseUpdateThread = new AtomicBoolean();
/**
* The cluster instance info.
*/
@NotNull
private final ClusterNodeInfo clusterNodeInfo;
/**
* The unique cluster id, similar to the unique machine id in MongoDB.
*/
private final int clusterId;
/**
* Map of known cluster nodes and the last known state updated
* by {@link #updateClusterState()}.
* Key: clusterId, value: ClusterNodeInfoDocument
*/
private final ConcurrentMap<Integer, ClusterNodeInfoDocument> clusterNodes
= Maps.newConcurrentMap();
/**
* Unmerged branches of this DocumentNodeStore instance.
*/
private final UnmergedBranches branches;
/**
* The unsaved last revisions. This contains the parents of all changed
* nodes, once those nodes are committed but the parent node itself wasn't
* committed yet. The parents are not immediately persisted as this would
* cause each commit to change all parents (including the root node), which
* would limit write scalability.
*
* Key: path, value: revision.
*/
private final UnsavedModifications unsavedLastRevisions = new UnsavedModifications();
/**
* Set of IDs for documents that may need to be split.
*/
private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
/**
* Summary of changes done by this cluster node to persist by the background
* update thread.
*/
private JournalEntry changes;
/**
* The current root node state.
*/
private volatile DocumentNodeState root;
private Thread backgroundReadThread;
/**
* Monitor object to synchronize background reads.
*/
private final Object backgroundReadMonitor = new Object();
/**
* Background thread performing updates of _lastRev entries.
*/
private Thread backgroundUpdateThread;
/**
* Monitor object to synchronize background writes.
*/
private final Object backgroundWriteMonitor = new Object();
/**
* Background thread performing the clusterId lease renew.
*/
@NotNull
private Thread leaseUpdateThread;
/**
* Background thread performing the cluster update
*/
@NotNull
private Thread clusterUpdateThread;
/**
* Monitor object to synchronize background sweeps.
*/
private final Object backgroundSweepMonitor = new Object();
/**
* Sweep thread cleaning up uncommitted changes.
*/
private Thread backgroundSweepThread;
/**
* Extra, one-off sweep2 background task for fixing OAK-9176 ie missing _bc
* on parents and root.
*/
private Thread backgroundSweep2Thread;
/**
* The sweep revision vector. Revisions for trunk commits older than this
* can safely be considered committed without looking up the commit value
* on the commit root document.
*/
private RevisionVector sweepRevisions = new RevisionVector();
/**
* Read/Write lock for background operations. Regular commits will acquire
* a shared lock, while a background write acquires an exclusive lock.
*/
private final ReadWriteLock backgroundOperationLock =
new ReentrantReadWriteLock(fairBackgroundOperationLock);
/**
* Read/Write lock to coordinate merges. In most cases merges acquire a
* shared read lock and can proceed concurrently. An exclusive write lock
* is acquired when the merge fails even after some retries and a final
* retry cycle is done.
* See {@link DocumentNodeStoreBranch#merge(CommitHook, CommitInfo)}.
*/
private final ReadWriteLock mergeLock = new ReentrantReadWriteLock();
/**
* Enable using simple revisions (just a counter). This feature is useful
* for testing.
*/
private AtomicInteger simpleRevisionCounter;
/**
* The node cache.
*
* Key: PathRev, value: DocumentNodeState
*/
private final Cache<PathRev, DocumentNodeState> nodeCache;
private final CacheStats nodeCacheStats;
/**
* Child node cache.
*
* Key: PathRev, value: Children
*/
private final Cache<NamePathRev, DocumentNodeState.Children> nodeChildrenCache;
private final CacheStats nodeChildrenCacheStats;
/**
* The change log to keep track of commits for diff operations.
*/
private final DiffCache diffCache;
/**
* The commit value resolver for this node store.
*/
private final CommitValueResolver commitValueResolver;
/**
* The blob store.
*/
private final BlobStore blobStore;
/**
* The clusterStateChangeListener is invoked on any noticed change in the
* clusterNodes collection.
* <p>
* Note that there is no synchronization between setting this one and using
* it, but arguably that is not necessary since it will be set at startup
* time and then never be changed.
*/
private ClusterStateChangeListener clusterStateChangeListener;
/**
* The BlobSerializer.
*/
private final BlobSerializer blobSerializer = new BlobSerializer() {
@Override
public String serialize(Blob blob) {
if (blob instanceof BlobStoreBlob) {
BlobStoreBlob bsb = (BlobStoreBlob) blob;
BlobStore bsbBlobStore = bsb.getBlobStore();
//see if the blob has been created from another store
if (bsbBlobStore != null && bsbBlobStore.equals(blobStore)) {
return bsb.getBlobId();
}
}
String id;
String reference = blob.getReference();
if(reference != null){
id = blobStore.getBlobId(reference);
if(id != null){
return id;
}
}
try {
id = createBlob(blob.getNewStream()).getBlobId();
} catch (IOException e) {
throw new IllegalStateException(e);
}
return id;
}
};
/**
* A predicate, which takes a String and returns {@code true} if the String
* is a serialized binary value of a {@link DocumentPropertyState}. The
* apply method will throw an IllegalArgumentException if the String is
* malformed.
*/
private final Function<String, Long> binarySize = new Function<String, Long>() {
@Override
public Long apply(@Nullable String input) {
return getBinarySize(input);
}
};
private final Clock clock;
private final Checkpoints checkpoints;
private final VersionGarbageCollector versionGarbageCollector;
private final JournalGarbageCollector journalGarbageCollector;
private final Iterable<ReferencedBlob> referencedBlobs;
private final Executor executor;
private final MissingLastRevSeeker lastRevSeeker;
private final LastRevRecoveryAgent lastRevRecoveryAgent;
private final boolean disableBranches;
private PersistentCache persistentCache;
private PersistentCache journalCache;
private final DocumentNodeStoreMBean mbean;
private final boolean readOnlyMode;
private DocumentNodeStateCache nodeStateCache = DocumentNodeStateCache.NOOP;
private final DocumentNodeStoreStatsCollector nodeStoreStatsCollector;
private final BundlingConfigHandler bundlingConfigHandler = new BundlingConfigHandler();
private final BundledDocumentDiffer bundledDocDiffer = new BundledDocumentDiffer(this);
private final JournalPropertyHandlerFactory journalPropertyHandlerFactory;
private final int updateLimit;
/**
* A set of non-branch commit revisions that are currently in progress. A
* revision is added to this set when {@link #newTrunkCommit(Changes, RevisionVector)}
* is called and removed when the commit either:
* <ul>
* <li>Succeeds with {@link #done(Commit, boolean, CommitInfo)}</li>
* <li>Fails with {@link #canceled(Commit)} and the commit was
* successfully rolled back.</li>
* </ul>
* The {@link NodeDocumentSweeper} periodically goes through this set and
* reverts changes done by commits in the set that are older than the
* current head revision.
*/
private final Set<Revision> inDoubtTrunkCommits = Sets.newConcurrentHashSet();
private final Predicate<Path> nodeCachePredicate;
public DocumentNodeStore(DocumentNodeStoreBuilder<?> builder) {
this.nodeCachePredicate = builder.getNodeCachePathPredicate();
this.updateLimit = builder.getUpdateLimit();
this.commitValueResolver = new CachingCommitValueResolver(
builder.getCommitValueCacheSize(), this::getSweepRevisions)
.withEmptyCommitValueCache(
builder.getCacheEmptyCommitValue() && builder.getReadOnlyMode(),
builder.getClock(), builder.getJournalGCMaxAge());
this.blobStore = builder.getBlobStore();
this.nodeStoreStatsCollector = builder.getNodeStoreStatsCollector();
if (builder.isUseSimpleRevision()) {
this.simpleRevisionCounter = new AtomicInteger(0);
}
DocumentStore s = builder.getDocumentStore();
if (builder.getTiming()) {
s = new TimingDocumentStoreWrapper(s);
}
if (builder.getLogging()) {
if (builder.getLoggingPrefix() != null) {
s = new LoggingDocumentStoreWrapper(s, builder.getLoggingPrefix());
} else {
s = new LoggingDocumentStoreWrapper(s);
}
}
if (builder.getReadOnlyMode()) {
s = ReadOnlyDocumentStoreWrapperFactory.getInstance(s);
readOnlyMode = true;
} else {
readOnlyMode = false;
}
checkVersion(s, readOnlyMode);
this.nonLeaseCheckingStore = s;
this.executor = builder.getExecutor();
this.lastRevSeeker = builder.createMissingLastRevSeeker();
this.clock = builder.getClock();
int cid = builder.getClusterId();
cid = SystemPropertySupplier.create("oak.documentMK.clusterId", cid).loggingTo(LOG).get();
if (readOnlyMode) {
clusterNodeInfo = ClusterNodeInfo.getReadOnlyInstance(nonLeaseCheckingStore);
} else {
clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
null, null, cid, builder.isClusterInvisible());
checkRevisionAge(nonLeaseCheckingStore, clusterNodeInfo, clock);
}
this.clusterId = clusterNodeInfo.getId();
clusterNodeInfo.setLeaseCheckMode(builder.getLeaseCheckMode());
if (builder.getLeaseCheckMode() != LeaseCheckMode.DISABLED) {
s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
}
String threadNamePostfix = "(" + clusterId + ")";
leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, stopLeaseUpdateThread),
"DocumentNodeStore lease update thread " + threadNamePostfix);
leaseUpdateThread.setDaemon(true);
if (!readOnlyMode) {
// OAK-3398 : make lease updating more robust by ensuring it
// has higher likelihood of succeeding than other threads
// on a very busy machine - so as to prevent lease timeout.
leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
leaseUpdateThread.start();
}
this.journalPropertyHandlerFactory = builder.getJournalPropertyHandlerFactory();
this.store = s;
this.changes = newJournalEntry();
this.branches = new UnmergedBranches();
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(
this, builder.createVersionGCSupport());
this.versionGarbageCollector.setStatisticsProvider(builder.getStatisticsProvider());
this.versionGarbageCollector.setGCMonitor(builder.getGCMonitor());
this.journalGarbageCollector = new JournalGarbageCollector(
this, builder.getJournalGCMaxAge());
this.referencedBlobs =
builder.createReferencedBlobs(this);
this.lastRevRecoveryAgent = new LastRevRecoveryAgent(store, this,
lastRevSeeker, clusterId -> this.signalClusterStateChange());
this.disableBranches = builder.isDisableBranches();
this.missing = new DocumentNodeState(this, new Path("missing"),
new RevisionVector(new Revision(0, 0, 0))) {
@Override
public int getMemory() {
return 8;
}
};
//TODO Make stats collection configurable as it add slight overhead
nodeCache = builder.buildNodeCache(this);
nodeCacheStats = new CacheStats(nodeCache, "Document-NodeState",
builder.getWeigher(), builder.getNodeCacheSize());
nodeChildrenCache = builder.buildChildrenCache(this);
nodeChildrenCacheStats = new CacheStats(nodeChildrenCache, "Document-NodeChildren",
builder.getWeigher(), builder.getChildrenCacheSize());
diffCache = builder.getDiffCache(this.clusterId);
// check if root node exists
NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
if (rootDoc == null) {
if (readOnlyMode) {
throw new DocumentStoreException("Unable to initialize a " +
"read-only DocumentNodeStore. The DocumentStore nodes " +
"collection does not have a root document.");
}
// root node is missing: repository is not initialized
Revision commitRev = newRevision();
RevisionVector head = new RevisionVector(commitRev);
Commit commit = new CommitBuilder(this, commitRev, null)
.addNode(ROOT)
.build();
try {
commit.applyToDocumentStore();
} catch (ConflictException e) {
commit.rollback();
throw new IllegalStateException("Conflict while creating root document", e);
}
unsavedLastRevisions.put(ROOT, commitRev);
sweepRevisions = sweepRevisions.pmax(head);
setRoot(head);
// make sure _lastRev is written back to store
backgroundWrite();
rootDoc = store.find(NODES, Utils.getIdFromPath(ROOT));
// at this point the root document must exist
if (rootDoc == null) {
throw new IllegalStateException("Root document does not exist");
}
} else {
sweepRevisions = sweepRevisions.pmax(rootDoc.getSweepRevisions());
initializeRootState(rootDoc);
// check if _lastRev for our clusterId exists
if (!rootDoc.getLastRev().containsKey(clusterId)) {
RevisionVector rootRev = getRoot().getRootRevision();
Revision initialRev = rootRev.getRevision(clusterId);
if (initialRev == null) {
throw new IllegalStateException(
"missing revision for clusterId " + clusterId +
": " + rootRev);
}
unsavedLastRevisions.put(ROOT, initialRev);
// set initial sweep revision
sweepRevisions = sweepRevisions.pmax(new RevisionVector(initialRev));
if (!readOnlyMode) {
backgroundWrite();
}
}
}
checkpoints = new Checkpoints(this);
// initialize branchCommits
branches.init(store, this);
dispatcher = builder.isPrefetchExternalChanges() ?
new PrefetchDispatcher(getRoot(), executor) :
new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this);
commitQueue.setStatisticsCollector(nodeStoreStatsCollector);
batchCommitQueue = new BatchCommitQueue(store);
// prepare background threads
backgroundReadThread = new Thread(
new BackgroundReadOperation(this, isDisposed),
"DocumentNodeStore background read thread " + threadNamePostfix);
backgroundReadThread.setDaemon(true);
backgroundUpdateThread = new Thread(
new BackgroundUpdateOperation(this, isDisposed),
"DocumentNodeStore background update thread " + threadNamePostfix);
backgroundUpdateThread.setDaemon(true);
backgroundSweepThread = new Thread(
new BackgroundSweepOperation(this, isDisposed),
"DocumentNodeStore background sweep thread " + threadNamePostfix);
backgroundSweepThread.setDaemon(true);
clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this, isDisposed),
"DocumentNodeStore cluster update thread " + threadNamePostfix);
clusterUpdateThread.setDaemon(true);
// now start the background threads
clusterUpdateThread.start();
backgroundReadThread.start();
if (!readOnlyMode) {
// OAK-8466 - background sweep may take a long time if there is no
// sweep revision for this clusterId. When this process is suddenly
// stopped while performing the sweep, a recovery will be needed
// starting at the timestamp of _lastRev for this clusterId, which
// is potentially old and the recovery will be expensive. Hence
// triggering below function to update _lastRev, just before
// triggering sweep
runBackgroundUpdateOperations();
// check if we need a sweep2 *before* doing a backgroundSweep.
// this enables us to detect a direct Oak <= 1.6 upgrade situation,
// where a sweep2 is *not* needed.
// there are 3 different cases with sweep[1]/sweep2:
// 1) Oak <= 1.6 direct upgrade:
// -> no sweep2 is needed as a sweep1 is needed anyway and sweep2
// from now on happens as part of it (with the OAK-9176 fix)
// 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
// -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
// never ran <= 1.6)
// 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
// -> A (full) sweep2 is needed. This is the main case of OAK-9176.
// In case 3 there is a valid, recent "_sweepRev" - and
// we can go ahead and do a "quick" backgroundSweep() here
// before continuing, to unblock the startup.
// After that, an async/background task is started for sweep2.
// which of cases 1-3 we have is determined via 'sweep2LockIfNecessary'
// and recorded in the settings collection.
// except for this case detection (which acquires a "sweep2 lock" if needed)
// we can otherwise continue normally. That means, a sweep1 can
// be considered as usual.
// Note that for case 3, doing this normal sweep1 can now also
// fix some "_bc" - which before OAK-9176 were missing
// and which sweep2 would separately fix as well - but this is not a problem.
// Note that by setting the SYS_PROP_DISABLE_SWEEP2 system property
// the sweep2 is bypassed and the sweep2 status is explicitly stored as "swept".
final long sweep2Lock;
if (disableSweep2) {
try {
final Sweep2StatusDocument sweep2Status = Sweep2StatusDocument.readFrom(store);
if (sweep2Status == null || !sweep2Status.isSwept()) {
// setting the disableSweep2 flag stores this in the repository
Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId);
}
} catch(Exception e) {
LOG.warn("<init> sweep2 is diabled as instructed by system property ("
+ SYS_PROP_DISABLE_SWEEP2 + "=true) - however, got an Exception"
+ " while storing sweep2 status in the settings collection: " + e, e);
}
sweep2Lock = -1;
} else {
// So: unless sweep2 is disabled: acquire sweep2 if one is (maybe) necessary
sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
}
// perform an initial document sweep if needed
// this may be long running if there is no sweep revision
// for this clusterId (upgrade from Oak <= 1.6).
// it is therefore important the lease thread is running already.
backgroundSweep();
backgroundUpdateThread.start();
backgroundSweepThread.start();
if (sweep2Lock >= 0) {
// sweep2 is necessary - so start a sweep2 background task
backgroundSweep2Thread = new Thread(
new BackgroundSweep2Operation(this, isDisposed, sweep2Lock),
"DocumentNodeStore background sweep2 thread " + threadNamePostfix);
backgroundSweep2Thread.setDaemon(true);
backgroundSweep2Thread.start();
}
}
persistentCache = builder.getPersistentCache();
if (!readOnlyMode && persistentCache != null) {
DynamicBroadcastConfig broadcastConfig = new DocumentBroadcastConfig(this);
persistentCache.setBroadcastConfig(broadcastConfig);
}
journalCache = builder.getJournalCache();
this.mbean = createMBean(builder);
LOG.info("ChangeSetBuilder enabled and size set to maxItems: {}, maxDepth: {}", changeSetMaxItems, changeSetMaxDepth);
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}, updateLimit: {} ({})",
clusterId, updateLimit,
getClusterNodeInfoDisplayString());
if (!builder.isBundlingDisabled()) {
bundlingConfigHandler.initialize(this, executor);
}
}
public void dispose() {
LOG.info("Starting disposal of DocumentNodeStore with clusterNodeId: {} ({})", clusterId,
getClusterNodeInfoDisplayString());
if (isDisposed.getAndSet(true)) {
// only dispose once
return;
}
// notify background threads waiting on isDisposed
synchronized (isDisposed) {
isDisposed.notifyAll();
}
Utils.joinQuietly(backgroundReadThread,
backgroundUpdateThread,
backgroundSweepThread,
backgroundSweep2Thread);
DocumentStoreException ex = null;
// create a tombstone commit revision after isDisposed is set to true.
// commits created earlier than this revision will be able to finish
// and we'll get a headOfQueue() callback when they are done
// after this call there are no more pending trunk commits and
// the commit queue is empty and stays empty
if (!readOnlyMode) {
try {
Revision tombstone = commitQueue.createRevision();
commitQueue.done(tombstone, new CommitQueue.Callback() {
@Override
public void headOfQueue(@NotNull Revision revision) {
setRoot(getHeadRevision().update(revision));
unsavedLastRevisions.put(ROOT, revision);
}
});
} catch (DocumentStoreException e) {
LOG.error("dispose: a DocumentStoreException happened during dispose's attempt to commit a tombstone: " + e, e);
ex = e;
}
}
try {
bundlingConfigHandler.close();
} catch (IOException e) {
LOG.warn("Error closing bundlingConfigHandler", bundlingConfigHandler, e);
}
// do a final round of background operations after
// the background thread stopped
if (!readOnlyMode) {
try {
// force a final background sweep to ensure there are
// no uncommitted changes up to the current head revision
internalRunBackgroundSweepOperation();
// push final changes to store, after this there must not
// be any more changes to documents in the nodes collection
internalRunBackgroundUpdateOperations();
} catch (DocumentStoreException e) {
// OAK-3250 : when a lease check fails, subsequent modifying requests
// to the DocumentStore will throw an DocumentStoreException. Since as a result
// of a failing lease check a bundle.stop is done and thus a dispose of the
// DocumentNodeStore happens, it is very likely that in that case
// you run into an Exception. We should still continue with disposing
// though - thus catching and logging..
LOG.error("dispose: a DocumentStoreException happened during dispose's last background ops: " + e, e);
ex = e;
}
}
Utils.joinQuietly(clusterUpdateThread);
// Stop lease update thread once no further document store operations
// are required
LOG.debug("Stopping LeaseUpdate thread...");
stopLeaseUpdateThread.set(true);
synchronized (stopLeaseUpdateThread) {
stopLeaseUpdateThread.notifyAll();
}
Utils.joinQuietly(leaseUpdateThread);
LOG.debug("Stopped LeaseUpdate thread");
// now mark this cluster node as inactive by disposing the
// clusterNodeInfo, but only if final background operations
// were successful
if (ex == null) {
clusterNodeInfo.dispose();
}
store.dispose();
try {
blobStore.close();
} catch (Exception e) {
LOG.debug("Error closing blob store " + blobStore, e);
}
if (persistentCache != null) {
persistentCache.close();
}
if (journalCache != null) {
journalCache.close();
}
String result = "(successful)";
if (ex != null) {
result = "(with exception: " + ex.toString() + ")";
}
LOG.info("Disposed DocumentNodeStore with clusterNodeId: {}, {}",
clusterId, result);
if (ex != null) {
throw ex;
}
}
private String getClusterNodeInfoDisplayString() {
return (readOnlyMode?"readOnly:true, ":"") + clusterNodeInfo.toString().replaceAll("[\r\n\t]", " ").trim();
}
void setRoot(@NotNull RevisionVector newHead) {
checkArgument(!newHead.isBranch());
root = getRoot(newHead);
}
@NotNull
public DocumentStore getDocumentStore() {
return store;
}
/**
* Creates a new commit. The caller must acknowledge the commit either with
* {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)},
* depending on the result of the commit.
*
* @param changes the changes to commit.
* @param base the base revision for the commit or <code>null</code> if the
* commit should use the current head revision as base.
* @param branch the branch instance if this is a branch commit. The life
* time of this branch commit is controlled by the
* reachability of this parameter. Once {@code branch} is
* weakly reachable, the document store implementation is
* free to remove the commits associated with the branch.
* @return a new commit.
*/
@NotNull
Commit newCommit(@NotNull Changes changes,
@Nullable RevisionVector base,
@Nullable DocumentNodeStoreBranch branch) {
if (base == null) {
base = getHeadRevision();
}
if (base.isBranch()) {
return newBranchCommit(changes, base, branch);
} else {
return newTrunkCommit(changes, base);
}
}
/**
* Creates a new merge commit. The caller must acknowledge the commit either with
* {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)},
* depending on the result of the commit.
*
* @param base the base revision for the commit.
* @param numBranchCommits the number of branch commits to merge.
* @return a new merge commit.
*/
@NotNull
private MergeCommit newMergeCommit(@NotNull RevisionVector base, int numBranchCommits) {
checkNotNull(base);
backgroundOperationLock.readLock().lock();
boolean success = false;
MergeCommit c;
try {
checkOpen();
c = new MergeCommit(this, base, commitQueue.createRevisions(numBranchCommits));
success = true;
} finally {
if (!success) {
backgroundOperationLock.readLock().unlock();
}
}
return c;
}
RevisionVector done(final @NotNull Commit c, boolean isBranch, final @NotNull CommitInfo info) {
if (commitQueue.contains(c.getRevision())) {
try {
inDoubtTrunkCommits.remove(c.getRevision());
final RevisionVector[] newHead = new RevisionVector[1];
commitQueue.done(c.getRevision(), new CommitQueue.Callback() {
@Override
public void headOfQueue(@NotNull Revision revision) {
// remember before revision
RevisionVector before = getHeadRevision();
// update head revision
Revision r = c.getRevision();
newHead[0] = before.update(r);
boolean success = false;
boolean cacheUpdated = false;
try {
// apply lastRev updates
c.applyLastRevUpdates(false);
// track modified paths
changes.modified(c.getModifiedPaths());
changes.readFrom(info);
changes.addChangeSet(getChangeSet(info));
// if we get here all required in-memory changes
// have been applied. The following operations in
// the try block may fail and the commit can still
// be considered successful
success = true;
// apply changes to cache, based on before revision
c.applyToCache(before, false);
cacheUpdated = true;
if (changes.getNumChangedNodes() >= journalPushThreshold) {
LOG.info("Pushing journal entry at {} as number of changes ({}) have reached threshold of {}",
r, changes.getNumChangedNodes(), journalPushThreshold);
pushJournalEntry(r);
}
} catch (Throwable e) {
if (success) {
if (cacheUpdated) {
LOG.warn("Pushing journal entry at {} failed", revision, e);
} else {
LOG.warn("Updating caches at {} failed", revision, e);
}
} else {
LOG.error("Applying in-memory changes at {} failed", revision, e);
}
} finally {
setRoot(newHead[0]);
commitQueue.headRevisionChanged();
dispatcher.contentChanged(getRoot(), info);
}
}
});
return newHead[0];
} finally {
backgroundOperationLock.readLock().unlock();
}
} else {
// branch commit
try {
c.applyLastRevUpdates(isBranch);
c.applyToCache(c.getBaseRevision(), isBranch);
return c.getBaseRevision().update(c.getRevision().asBranchRevision());
} finally {
if (isDisableBranches()) {
backgroundOperationLock.readLock().unlock();
}
}
}
}
void canceled(Commit c) {
if (commitQueue.contains(c.getRevision())) {
try {
commitQueue.canceled(c.getRevision());
if (c.rollback()) {
// rollback was successful
inDoubtTrunkCommits.remove(c.getRevision());
}
} finally {
backgroundOperationLock.readLock().unlock();
}
} else {
try {
c.rollback();
Branch b = branches.getBranch(c.getBaseRevision());
if (b != null) {
b.removeCommit(c.getRevision().asBranchRevision());
}
} finally {
if (isDisableBranches()) {
backgroundOperationLock.readLock().unlock();
}
}
}
}
public void setAsyncDelay(int delay) {
this.asyncDelay = delay;
}
public int getAsyncDelay() {
return asyncDelay;
}
public void setMaxBackOffMillis(int time) {
maxBackOffMillis = time;
}
public int getMaxBackOffMillis() {
return maxBackOffMillis;
}
public int getChangeSetMaxItems() {
return changeSetMaxItems;
}
public void setChangeSetMaxItems(int changeSetMaxItems) {
this.changeSetMaxItems = changeSetMaxItems;
}
public int getChangeSetMaxDepth() {
return changeSetMaxDepth;
}
public void setChangeSetMaxDepth(int changeSetMaxDepth) {
this.changeSetMaxDepth = changeSetMaxDepth;
}
void setEnableConcurrentAddRemove(boolean b) {
enableConcurrentAddRemove = b;
}
boolean getEnableConcurrentAddRemove() {
return enableConcurrentAddRemove;
}
int getJournalPushThreshold() {
return journalPushThreshold;
}
void setJournalPushThreshold(int journalPushThreshold) {
this.journalPushThreshold = journalPushThreshold;
}
@NotNull
public ClusterNodeInfo getClusterInfo() {
return clusterNodeInfo;
}
public CacheStats getNodeCacheStats() {
return nodeCacheStats;
}
public CacheStats getNodeChildrenCacheStats() {
return nodeChildrenCacheStats;
}
@NotNull
public Iterable<CacheStats> getDiffCacheStats() {
return diffCache.getStats();
}
public Cache<PathRev, DocumentNodeState> getNodeCache() {
return nodeCache;
}
public Cache<NamePathRev, DocumentNodeState.Children> getNodeChildrenCache() {
return nodeChildrenCache;
}
public Predicate<Path> getNodeCachePredicate() {
return nodeCachePredicate;
}
/**
* Returns the journal entry that will be stored in the journal with the
* next background updated.
*
* @return the current journal entry.
*/
JournalEntry getCurrentJournalEntry() {
return changes;
}
void invalidateNodeChildrenCache() {
nodeChildrenCache.invalidateAll();
}
void invalidateNodeCache(String path, RevisionVector revision){
nodeCache.invalidate(new PathRev(Path.fromString(path), revision));
}
public int getPendingWriteCount() {
return unsavedLastRevisions.getPaths().size();
}
public boolean isDisableBranches() {
return disableBranches;
}
/**
* Enqueue the document with the given id as a split candidate.
*
* @param id the id of the document to check if it needs to be split.
*/
void addSplitCandidate(String id) {
splitCandidates.put(id, id);
}
@Nullable
AbstractDocumentNodeState getSecondaryNodeState(@NotNull final Path path,
@NotNull final RevisionVector rootRevision,
@NotNull final RevisionVector rev) {
//Check secondary cache first
return nodeStateCache.getDocumentNodeState(path, rootRevision, rev);
}
@NotNull
public PropertyState createPropertyState(String name, String value){
return new DocumentPropertyState(this, name, checkNotNull(value));
}
/**
* Get the node for the given path and revision. The returned object might
* not be modified directly.
*
* @param path the path of the node.
* @param rev the read revision.
* @return the node or <code>null</code> if the node does not exist at the
* given revision.
*/
@Nullable
public DocumentNodeState getNode(@NotNull final Path path,
@NotNull final RevisionVector rev) {
checkNotNull(rev);
checkNotNull(path);
final long start = PERFLOG.start();
try {
PathRev key = new PathRev(path, rev);
DocumentNodeState node = nodeCache.get(key, new Callable<DocumentNodeState>() {
@Override
public DocumentNodeState call() throws Exception {
boolean nodeDoesNotExist = checkNodeNotExistsFromChildrenCache(path, rev);
if (nodeDoesNotExist){
return missing;
}
DocumentNodeState n = readNode(path, rev);
if (n == null) {
n = missing;
}
return n;
}
});
final DocumentNodeState result = node == missing
|| node.equals(missing) ? null : node;
PERFLOG.end(start, 1, "getNode: path={}, rev={}", path, rev);
return result;
} catch (UncheckedExecutionException e) {
throw DocumentStoreException.convert(e.getCause());
} catch (ExecutionException e) {
throw DocumentStoreException.convert(e.getCause());
}
}
@NotNull
DocumentNodeState.Children getChildren(@NotNull final AbstractDocumentNodeState parent,
@NotNull final String name,
final int limit)
throws DocumentStoreException {
if (checkNotNull(parent).hasNoChildren()) {
return DocumentNodeState.NO_CHILDREN;
}
final Path path = checkNotNull(parent).getPath();
final RevisionVector readRevision = parent.getLastRevision();
try {
NamePathRev key = childNodeCacheKey(path, readRevision, name);
DocumentNodeState.Children children = nodeChildrenCache.get(key, new Callable<DocumentNodeState.Children>() {
@Override
public DocumentNodeState.Children call() throws Exception {
return readChildren(parent, name, limit);
}
});
if (children.children.size() < limit && children.hasMore) {
// not enough children loaded - load more,
// and put that in the cache
// (not using nodeChildrenCache.invalidate, because
// the generational persistent cache doesn't support that)
children = readChildren(parent, name, limit);
nodeChildrenCache.put(key, children);
}
return children;
} catch (UncheckedExecutionException e) {
throw DocumentStoreException.convert(e.getCause(),
"Error occurred while fetching children for path "
+ path);
} catch (ExecutionException e) {
throw DocumentStoreException.convert(e.getCause(),
"Error occurred while fetching children for path "
+ path);
}
}
/**
* Read the children of the given parent node state starting at the child
* node with {@code name}. The given {@code name} is exclusive and will not
* appear in the list of children. The returned children are sorted in
* ascending order.
*
* @param parent the parent node.
* @param name the name of the lower bound child node (exclusive) or the
* empty {@code String} if no lower bound is given.
* @param limit the maximum number of child nodes to return.
* @return the children of {@code parent}.
*/
DocumentNodeState.Children readChildren(@NotNull AbstractDocumentNodeState parent,
@NotNull String name, int limit) {
String queriedName = name;
Path path = parent.getPath();
RevisionVector rev = parent.getLastRevision();
LOG.trace("Reading children for [{}] at rev [{}]", path, rev);
Iterable<NodeDocument> docs;
DocumentNodeState.Children c = new DocumentNodeState.Children();
// add one to the requested limit for the raw limit
// this gives us a chance to detect whether there are more
// child nodes than requested.
int rawLimit = (int) Math.min(Integer.MAX_VALUE, ((long) limit) + 1);
for (;;) {
docs = readChildDocs(path, name, rawLimit);
int numReturned = 0;
for (NodeDocument doc : docs) {
numReturned++;
Path p = doc.getPath();
// remember name of last returned document for
// potential next round of readChildDocs()
name = p.getName();
// filter out deleted children
DocumentNodeState child = getNode(p, rev);
if (child == null) {
continue;
}
if (c.children.size() < limit) {
// add to children until limit is reached
c.children.add(p.getName());
} else {
// enough collected and we know there are more
c.hasMore = true;
return c;
}
}
// if we get here we have less than or equal the requested children
if (numReturned < rawLimit) {
// fewer documents returned than requested
// -> no more documents
c.hasMore = false;
if (queriedName.isEmpty()) {
//we've got to the end of list and we started from the top
//This list is complete and can be sorted
Collections.sort(c.children);
}
return c;
}
}
}
/**
* Returns the child documents at the given {@code path} and returns up to
* {@code limit} documents. The returned child documents are sorted in
* ascending child node name order. If a {@code name} is passed, the first
* child document returned is after the given name. That is, the name is the
* lower exclusive bound.
*
* @param path the path of the parent document.
* @param name the name of the lower bound child node (exclusive) or the
* empty {@code String} if no lower bound is given.
* @param limit the maximum number of child documents to return.
* @return the child documents.
*/
@NotNull
private Iterable<NodeDocument> readChildDocs(@NotNull final Path path,
@NotNull String name,
final int limit) {
final String to = Utils.getKeyUpperLimit(checkNotNull(path));
final String from;
if (name.isEmpty()) {
from = Utils.getKeyLowerLimit(path);
} else {
from = Utils.getIdFromPath(new Path(path, name));
}
return store.query(Collection.NODES, from, to, limit);
}
/**
* Returns up to {@code limit} child nodes, starting at the given
* {@code name} (exclusive).
*
* @param parent the parent node.
* @param name the name of the lower bound child node (exclusive) or the
* empty {@code String}, if the method should start with the
* first known child node.
* @param limit the maximum number of child nodes to return.
* @return the child nodes.
*/
@NotNull
Iterable<DocumentNodeState> getChildNodes(@NotNull final DocumentNodeState parent,
@NotNull final String name,
final int limit) {
// Preemptive check. If we know there are no children then
// return straight away
if (checkNotNull(parent).hasNoChildren()) {
return Collections.emptyList();
}
final RevisionVector readRevision = parent.getLastRevision();
return transform(getChildren(parent, name, limit).children, new Function<String, DocumentNodeState>() {
@Override
public DocumentNodeState apply(String input) {
Path p = new Path(parent.getPath(), input);
DocumentNodeState result = getNode(p, readRevision);
if (result == null) {
// This is very unexpected situation - parent's child list
// declares the child to exist, while its node state is
// null. Let's put some extra effort to do some logging
// and invalidate the affected cache entries.
String id = Utils.getIdFromPath(p);
String cachedDocStr = docAsString(id, true);
String uncachedDocStr = docAsString(id, false);
nodeCache.invalidate(new PathRev(p, readRevision));
nodeChildrenCache.invalidate(childNodeCacheKey(
parent.getPath(), readRevision, name));
String exceptionMsg = String.format(
"Aborting getChildNodes() - DocumentNodeState is null for %s at %s " +
"{\"cachedDoc\":{%s}, \"uncachedDoc\":{%s}}",
readRevision, p, cachedDocStr, uncachedDocStr);
throw new DocumentStoreException(exceptionMsg);
}
return result.withRootRevision(parent.getRootRevision(),
parent.isFromExternalChange());
}
private String docAsString(String id, boolean cached) {
try {
NodeDocument doc;
if (cached) {
doc = store.find(Collection.NODES, id);
} else {
doc = store.find(Collection.NODES, id, 0);
}
if (doc == null) {
return "<null>";
} else {
return doc.asString();
}
} catch (DocumentStoreException e) {
return e.toString();
}
}
});
}
@Nullable
private DocumentNodeState readNode(Path path, RevisionVector readRevision) {
final long start = PERFLOG.start();
String id = Utils.getIdFromPath(path);
Revision lastRevision = getPendingModifications().get(path);
NodeDocument doc = store.find(Collection.NODES, id);
if (doc == null) {
PERFLOG.end(start, 1,
"readNode: (document not found) path={}, readRevision={}",
path, readRevision);
return null;
}
final DocumentNodeState result = doc.getNodeAtRevision(this, readRevision, lastRevision);
PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path, readRevision);
return result;
}
public BundlingConfigHandler getBundlingConfigHandler() {
return bundlingConfigHandler;
}
/**
* Apply the changes of a node to the cache.
*
* @param before the before revision (old head)
* @param after the after revision (new head)
* @param rev the commit revision
* @param path the path
* @param isNew whether this is a new node
* @param added the list of added child nodes
* @param removed the list of removed child nodes
* @param changed the list of changed child nodes
*
*/
void applyChanges(RevisionVector before, RevisionVector after,
Revision rev, Path path,
boolean isNew, List<Path> added,
List<Path> removed, List<Path> changed) {
if (isNew) {
// determine the revision for the nodeChildrenCache entry when
// the node is new. Fallback to after revision in case document
// is not in the cache. (OAK-4715)
NodeDocument doc = store.getIfCached(NODES, getIdFromPath(path));
RevisionVector afterLastRev = after;
if (doc != null) {
afterLastRev = new RevisionVector(doc.getLastRev().values());
afterLastRev = afterLastRev.update(rev);
}
if (added.isEmpty()) {
// this is a leaf node.
// check if it has the children flag set
if (doc != null && doc.hasChildren()) {
NamePathRev key = childNodeCacheKey(path, afterLastRev, "");
LOG.debug("nodeChildrenCache.put({},{})", key, "NO_CHILDREN");
nodeChildrenCache.put(key, DocumentNodeState.NO_CHILDREN);
}
} else {
DocumentNodeState.Children c = new DocumentNodeState.Children();
Set<String> set = Sets.newTreeSet();
for (Path p : added) {
set.add(p.getName());
}
c.children.addAll(set);
NamePathRev key = childNodeCacheKey(path, afterLastRev, "");
LOG.debug("nodeChildrenCache.put({},{})", key, c);
nodeChildrenCache.put(key, c);
}
} else {
// existed before
DocumentNodeState beforeState = getRoot(before);
// do we have a cached before state that can be used
// to calculate the new children?
int depth = path.getDepth();
for (int i = 1; i <= depth && beforeState != null; i++) {
Path p = path.getAncestor(depth - i);
RevisionVector lastRev = beforeState.getLastRevision();
PathRev key = new PathRev(p, lastRev);
beforeState = nodeCache.getIfPresent(key);
if (missing.equals(beforeState)) {
// This is unexpected. The before state should exist.
// Invalidate the relevant cache entries. (OAK-6294)
LOG.warn("Before state is missing {}. Invalidating " +
"affected cache entries.", key);
store.invalidateCache(NODES, Utils.getIdFromPath(p));
nodeCache.invalidate(key);
nodeChildrenCache.invalidate(childNodeCacheKey(path, lastRev, ""));
beforeState = null;
}
}
DocumentNodeState.Children children = null;
if (beforeState != null) {
if (beforeState.hasNoChildren()) {
children = DocumentNodeState.NO_CHILDREN;
} else {
NamePathRev key = childNodeCacheKey(path, beforeState.getLastRevision(), "");
children = nodeChildrenCache.getIfPresent(key);
}
}
if (children != null) {
NamePathRev afterKey = childNodeCacheKey(path, beforeState.getLastRevision().update(rev), "");
// are there any added or removed children?
if (added.isEmpty() && removed.isEmpty()) {
// simply use the same list
LOG.debug("nodeChildrenCache.put({},{})", afterKey, children);
nodeChildrenCache.put(afterKey, children);
} else if (!children.hasMore){
// list is complete. use before children as basis
Set<String> afterChildren = Sets.newTreeSet(children.children);
for (Path p : added) {
afterChildren.add(p.getName());
}
for (Path p : removed) {
afterChildren.remove(p.getName());
}
DocumentNodeState.Children c = new DocumentNodeState.Children();
c.children.addAll(afterChildren);
if (c.children.size() <= DocumentNodeState.MAX_FETCH_SIZE) {
LOG.debug("nodeChildrenCache.put({},{})", afterKey, c);
nodeChildrenCache.put(afterKey, c);
} else {
LOG.info("not caching more than {} child names for {}",
DocumentNodeState.MAX_FETCH_SIZE, path);
}
} else if (added.isEmpty()) {
// incomplete list, but we only removed nodes
// use linked hash set to retain order
Set<String> afterChildren = Sets.newLinkedHashSet(children.children);
for (Path p : removed) {
afterChildren.remove(p.getName());
}
DocumentNodeState.Children c = new DocumentNodeState.Children();
c.children.addAll(afterChildren);
c.hasMore = true;
LOG.debug("nodeChildrenCache.put({},{})", afterKey, c);
nodeChildrenCache.put(afterKey, c);
}
}
}
}
/**
* Called when a branch is merged.
*
* @param revisions the revisions of the merged branch commits.
*/
void revisionsMerged(@NotNull Iterable<Revision> revisions) {
changes.branchCommit(revisions);
}
/**
* Updates a commit root document.
*
* @param commit the updates to apply on the commit root document.
* @param commitRev the commit revision.
* @return the document before the update was applied or <code>null</code>
* if the update failed because of a collision.
* @throws DocumentStoreException if the update fails with an error.
*/
@Nullable
NodeDocument updateCommitRoot(UpdateOp commit, Revision commitRev)
throws DocumentStoreException {
// use batch commit when there are only revision and modified updates
boolean batch = true;
for (Map.Entry<Key, Operation> op : commit.getChanges().entrySet()) {
String name = op.getKey().getName();
if (NodeDocument.isRevisionsEntry(name)
|| NodeDocument.MODIFIED_IN_SECS.equals(name)) {
continue;
}
batch = false;
break;
}
try {
if (batch) {
return batchUpdateCommitRoot(commit);
} else {
return store.findAndUpdate(NODES, commit);
}
} catch (DocumentStoreException e) {
return verifyCommitRootUpdateApplied(commit, commitRev, e);
}
}
/**
* Verifies if the {@code commit} update on the commit root was applied by
* reading the affected document and checks if the {@code commitRev} is
* set in the revisions map.
*
* @param commit the update operation on the commit root document.
* @param commitRev the commit revision.
* @param e the exception that will be thrown when this method determines
* that the update was not applied.
* @return the before document.
* @throws DocumentStoreException the exception passed to this document
* in case the commit update was not applied.
*/
private NodeDocument verifyCommitRootUpdateApplied(UpdateOp commit,
Revision commitRev,
DocumentStoreException e)
throws DocumentStoreException {
LOG.info("Update of commit root failed with exception", e);
int numRetries = 10;
for (int i = 0; i < numRetries; i++) {
LOG.info("Checking if change made it to the DocumentStore anyway {}/{} ...",
i + 1, numRetries);
NodeDocument commitRootDoc;
try {
commitRootDoc = store.find(NODES, commit.getId(), 0);
} catch (Exception ex) {
LOG.info("Failed to read commit root document", ex);
continue;
}
if (commitRootDoc == null) {
LOG.info("Commit root document missing for {}", commit.getId());
break;
}
if (commitRootDoc.getLocalRevisions().containsKey(commitRev)) {
LOG.info("Update made it to the store even though the call " +
"failed with an exception. Previous exception will " +
"be suppressed. {}", commit);
NodeDocument before = NODES.newDocument(store);
commitRootDoc.deepCopy(before);
UpdateUtils.applyChanges(before, commit.getReverseOperation());
return before;
}
break;
}
LOG.info("Update didn't make it to the store. Re-throwing the exception");
throw e;
}
private NodeDocument batchUpdateCommitRoot(UpdateOp commit)
throws DocumentStoreException {
try {
return batchCommitQueue.updateDocument(commit).call();
} catch (InterruptedException e) {
throw DocumentStoreException.convert(e,
"Interrupted while updating commit root document");
} catch (Exception e) {
throw DocumentStoreException.convert(e,
"Update of commit root document failed");
}
}
/**
* Returns the root node state at the given revision.
*
* @param revision a revision.
* @return the root node state at the given revision.
*/
@NotNull
DocumentNodeState getRoot(@NotNull RevisionVector revision) {
DocumentNodeState root = getNode(ROOT, revision);
if (root == null) {
throw new IllegalStateException(
"root node does not exist at revision " + revision);
}
return root;
}
@NotNull
DocumentNodeStoreBranch createBranch(DocumentNodeState base) {
return new DocumentNodeStoreBranch(this, base, mergeLock);
}
@NotNull
RevisionVector rebase(@NotNull RevisionVector branchHead,
@NotNull RevisionVector base) {
checkNotNull(branchHead);
checkNotNull(base);
if (disableBranches) {
return branchHead;
}
// TODO conflict handling
Branch b = getBranches().getBranch(branchHead);
if (b == null) {
// empty branch
return base.asBranchRevision(getClusterId());
}
if (b.getBase(branchHead.getBranchRevision()).equals(base)) {
return branchHead;
}
// add a pseudo commit to make sure current head of branch
// has a higher revision than base of branch
Revision head = newRevision().asBranchRevision();
b.rebase(head, base);
return base.update(head);
}
@NotNull
RevisionVector reset(@NotNull RevisionVector branchHead,
@NotNull RevisionVector ancestor) {
checkNotNull(branchHead);
checkNotNull(ancestor);
Branch b = getBranches().getBranch(branchHead);
if (b == null) {
throw new DocumentStoreException("Empty branch cannot be reset");
}
if (!b.getCommits().last().equals(branchHead.getRevision(getClusterId()))) {
throw new DocumentStoreException(branchHead + " is not the head " +
"of a branch");
}
Revision ancestorRev = ancestor.getBranchRevision();
if (!b.containsCommit(ancestorRev)
&& !b.getBase().asBranchRevision(getClusterId()).equals(ancestor)) {
throw new DocumentStoreException(ancestor + " is not " +
"an ancestor revision of " + branchHead);
}
// tailSet is inclusive and will contain the ancestorRev, unless
// the ancestorRev is the base revision of the branch
List<Revision> revs = new ArrayList<>();
if (!b.containsCommit(ancestorRev)) {
// add before all other branch revisions
revs.add(ancestorRev);
}
revs.addAll(b.getCommits().tailSet(ancestorRev));
UpdateOp rootOp = new UpdateOp(Utils.getIdFromPath(ROOT), false);
// reset each branch commit in reverse order
Map<Path, UpdateOp> operations = Maps.newHashMap();
AtomicReference<Revision> currentRev = new AtomicReference<>();
for (Revision r : reverse(revs)) {
operations.clear();
Revision previous = currentRev.getAndSet(r);
if (previous == null) {
continue;
}
NodeDocument.removeCollision(rootOp, previous.asTrunkRevision());
NodeDocument.removeRevision(rootOp, previous.asTrunkRevision());
NodeDocument.removeBranchCommit(rootOp, previous.asTrunkRevision());
BranchCommit bc = b.getCommit(previous);
if (bc.isRebase()) {
continue;
}
DocumentNodeState branchState = getRoot(bc.getBase().update(previous));
DocumentNodeState baseState = getRoot(bc.getBase().update(r));
LOG.debug("reset: comparing branch {} with base {}",
branchState.getRootRevision(), baseState.getRootRevision());
branchState.compareAgainstBaseState(baseState,
new ResetDiff(previous.asTrunkRevision(), operations));
LOG.debug("reset: applying {} operations", operations.size());
// apply reset operations
for (List<UpdateOp> ops : partition(operations.values(), getCreateOrUpdateBatchSize())) {
store.createOrUpdate(NODES, ops);
}
}
store.findAndUpdate(NODES, rootOp);
// clean up in-memory branch data
for (Revision r : revs) {
if (!r.equals(ancestorRev)) {
b.removeCommit(r);
}
}
return ancestor;
}
@NotNull
RevisionVector merge(@NotNull RevisionVector branchHead,
@NotNull CommitInfo info)
throws ConflictException, CommitFailedException {
Branch b = getBranches().getBranch(branchHead);
RevisionVector base = branchHead;
if (b != null) {
base = b.getBase(branchHead.getBranchRevision());
}
int numBranchCommits = b != null ? b.getCommits().size() : 1;
RevisionVector newHead;
boolean success = false;
MergeCommit commit = newMergeCommit(base, numBranchCommits);
try {
// make branch commits visible
UpdateOp op = new UpdateOp(Utils.getIdFromPath(ROOT), false);
NodeDocument.setModified(op, commit.getRevision());
if (b != null) {
// check the branch age and fail the commit
// if the first branch commit is too old
checkBranchAge(b);
commit.addBranchCommits(b);
Iterator<Revision> mergeCommits = commit.getMergeRevisions().iterator();
for (Revision rev : b.getCommits()) {
rev = rev.asTrunkRevision();
String commitTag = "c-" + mergeCommits.next();
NodeDocument.setRevision(op, rev, commitTag);
op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
}
if (store.findAndUpdate(Collection.NODES, op) != null) {
// remove from branchCommits map after successful update
b.applyTo(getPendingModifications(), commit.getRevision());
getBranches().remove(b);
} else {
NodeDocument root = Utils.getRootDocument(store);
Set<Revision> conflictRevs = root.getConflictsFor(b.getCommits());
String msg = "Conflicting concurrent change. Update operation failed: " + op;
throw new ConflictException(msg, conflictRevs);
}
} else {
// no commits in this branch -> do nothing
}
newHead = done(commit, false, info);
success = true;
} finally {
if (!success) {
canceled(commit);
}
}
return newHead;
}
/**
* Compares the given {@code node} against the {@code base} state and
* reports the differences to the {@link NodeStateDiff}.
*
* @param node the node to compare.
* @param base the base node to compare against.
* @param diff handler of node state differences
* @return {@code true} if the full diff was performed, or
* {@code false} if it was aborted as requested by the handler
* (see the {@link NodeStateDiff} contract for more details)
*/
@Override
public boolean compare(@NotNull final AbstractDocumentNodeState node,
@NotNull final AbstractDocumentNodeState base,
@NotNull NodeStateDiff diff) {
if (!AbstractNodeState.comparePropertiesAgainstBaseState(node, base, diff)) {
return false;
}
if (node.hasNoChildren() && base.hasNoChildren()) {
return true;
}
return new JsopNodeStateDiffer(diffCache.getChanges(base.getRootRevision(),
node.getRootRevision(), node.getPath(),
new DiffCache.Loader() {
@Override
public String call() {
return diffImpl(base, node);
}
})).withoutPropertyChanges().compare(node, base, diff);
}
/**
* Creates a tracker for the given commit revision.
*
* @param r a commit revision.
* @param isBranchCommit whether this is a branch commit.
* @return a _lastRev tracker for the given commit revision.
*/
LastRevTracker createTracker(final @NotNull Revision r,
final boolean isBranchCommit) {
if (isBranchCommit && !disableBranches) {
Revision branchRev = r.asBranchRevision();
return branches.getBranchCommit(branchRev);
} else {
return new LastRevTracker() {
@Override
public void track(Path path) {
unsavedLastRevisions.put(path, r);
}
};
}
}
/**
* Suspends until all given revisions are either visible from the current
* headRevision or canceled from the commit queue.
*
* Only revisions from the local cluster node will be considered if the async
* delay is set to 0.
*
* @param conflictRevisions the revision to become visible.
* @return the suspend time in milliseconds.
*/
long suspendUntilAll(@NotNull Set<Revision> conflictRevisions) {
long time = clock.getTime();
// do not suspend if revision is from another cluster node
// and background read is disabled
if (getAsyncDelay() == 0) {
Set<Revision> onlyLocal = new HashSet<Revision>(conflictRevisions.size());
for (Revision r : conflictRevisions) {
if (r.getClusterId() == getClusterId()) {
onlyLocal.add(r);
}
}
commitQueue.suspendUntilAll(onlyLocal);
} else {
commitQueue.suspendUntilAll(conflictRevisions);
}
return clock.getTime() - time;
}
//------------------------< Observable >------------------------------------
@Override
public Closeable addObserver(Observer observer) {
return dispatcher.addObserver(observer);
}
//-------------------------< NodeStore >------------------------------------
@NotNull
@Override
public DocumentNodeState getRoot() {
return root;
}
@NotNull
@Override
public NodeState merge(@NotNull NodeBuilder builder,
@NotNull CommitHook commitHook,
@NotNull CommitInfo info)
throws CommitFailedException {
return asDocumentRootBuilder(builder).merge(commitHook, info);
}
@NotNull
@Override
public NodeState rebase(@NotNull NodeBuilder builder) {
return asDocumentRootBuilder(builder).rebase();
}
@Override
public NodeState reset(@NotNull NodeBuilder builder) {
return asDocumentRootBuilder(builder).reset();
}
@Override
@NotNull
public BlobStoreBlob createBlob(InputStream inputStream) throws IOException {
return new BlobStoreBlob(blobStore, blobStore.writeBlob(inputStream));
}
/**
* Returns the {@link Blob} with the given reference. Note that this method is meant to
* be used with secure reference obtained from Blob#reference which is different from blobId
*
* @param reference the reference of the blob.
* @return the blob.
*/
@Override
public Blob getBlob(@NotNull String reference) {
String blobId = blobStore.getBlobId(reference);
if(blobId != null){
return new BlobStoreBlob(blobStore, blobId);
}
LOG.debug("No blobId found matching reference [{}]", reference);
return null;
}
/**
* Returns the {@link Blob} with the given blobId.
*
* @param blobId the blobId of the blob.
* @return the blob.
*/
public Blob getBlobFromBlobId(String blobId){
return new BlobStoreBlob(blobStore, blobId);
}
@NotNull
@Override
public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
checkOpen();
return checkpoints.create(lifetime, properties).toString();
}
@NotNull
@Override
public String checkpoint(long lifetime) {
checkOpen();
Map<String, String> empty = Collections.emptyMap();
return checkpoint(lifetime, empty);
}
@NotNull
@Override
public Map<String, String> checkpointInfo(@NotNull String checkpoint) {
checkOpen();
Revision r = Revision.fromString(checkpoint);
Checkpoints.Info info = checkpoints.getCheckpoints().get(r);
if (info == null) {
// checkpoint does not exist
return Collections.emptyMap();
} else {
return info.get();
}
}
@NotNull
@Override
public Iterable<String> checkpoints() {
checkOpen();
final long now = clock.getTime();
return Iterables.transform(Iterables.filter(checkpoints.getCheckpoints().entrySet(),
new Predicate<Map.Entry<Revision,Checkpoints.Info>>() {
@Override
public boolean apply(Map.Entry<Revision,Checkpoints.Info> cp) {
return cp.getValue().getExpiryTime() > now;
}
}), new Function<Map.Entry<Revision,Checkpoints.Info>, String>() {
@Override
public String apply(Map.Entry<Revision,Checkpoints.Info> cp) {
return cp.getKey().toString();
}
});
}
@Nullable
@Override
public NodeState retrieve(@NotNull String checkpoint) {
checkOpen();
RevisionVector rv = getCheckpoints().retrieve(checkpoint);
if (rv == null) {
return null;
}
// make sure all changes up to checkpoint are visible
suspendUntilAll(Sets.newHashSet(rv));
return getRoot(rv);
}
@Override
public boolean release(@NotNull String checkpoint) {
checkOpen();
checkpoints.release(checkpoint);
return true;
}
//------------------------< RevisionContext >-------------------------------
@Override
public UnmergedBranches getBranches() {
return branches;
}
@Override
public UnsavedModifications getPendingModifications() {
return unsavedLastRevisions;
}
@Override
public int getClusterId() {
return clusterId;
}
@Override
@NotNull
public RevisionVector getHeadRevision() {
return root.getRootRevision();
}
@Override
@NotNull
public Revision newRevision() {
if (simpleRevisionCounter != null) {
return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
}
Revision r = Revision.newRevision(clusterId);
if (clusterNodeInfo.getLeaseCheckMode() == LeaseCheckMode.STRICT) {
// verify revision timestamp is within lease period
long leaseEnd = clusterNodeInfo.getLeaseEndTime();
if (r.getTimestamp() >= leaseEnd) {
String msg = String.format("Cannot use new revision %s " +
"with timestamp %s >= lease end %s",
r, r.getTimestamp(), leaseEnd);
throw new DocumentStoreException(msg);
}
}
return r;
}
@Override
@NotNull
public Clock getClock() {
return clock;
}
@Override
public String getCommitValue(@NotNull Revision changeRevision,
@NotNull NodeDocument doc) {
return commitValueResolver.resolve(changeRevision, doc);
}
//----------------------< background operations >---------------------------
/** Used for testing only */
public void runBackgroundOperations() {
runBackgroundSweepOperation();
runBackgroundUpdateOperations();
runBackgroundReadOperations();
}
/** Note: made package-protected for testing purpose, would otherwise be private **/
void runBackgroundUpdateOperations() {
if (readOnlyMode || isDisposed.get()) {
return;
}
try {
internalRunBackgroundUpdateOperations();
} catch (RuntimeException e) {
if (isDisposed.get()) {
return;
}
LOG.warn("Background update operation failed (will be retried with next run): " + e.toString(), e);
throw e;
}
}
private void internalRunBackgroundUpdateOperations() {
BackgroundWriteStats stats;
synchronized (backgroundWriteMonitor) {
long start = clock.getTime();
long time = start;
// clean orphaned branches and collisions
cleanOrphanedBranches();
cleanRootCollisions();
long cleanTime = clock.getTime() - time;
time = clock.getTime();
// split documents (does not create new revisions)
backgroundSplit();
long splitTime = clock.getTime() - time;
time = clock.getTime();
maybeRefreshHeadRevision();
long refreshTime = clock.getTime() - time;
// write back pending updates to _lastRev
stats = backgroundWrite();
stats.refresh = refreshTime;
stats.split = splitTime;
stats.clean = cleanTime;
stats.totalWriteTime = clock.getTime() - start;
String msg = "Background operations stats ({})";
logBackgroundOperation(start, msg, stats);
}
//Push stats outside of sync block
nodeStoreStatsCollector.doneBackgroundUpdate(stats);
}
//----------------------< background read operations >----------------------
/** Note: made package-protected for testing purpose, would otherwise be private **/
void runBackgroundReadOperations() {
if (isDisposed.get()) {
return;
}
try {
internalRunBackgroundReadOperations();
} catch (RuntimeException e) {
if (isDisposed.get()) {
return;
}
LOG.warn("Background read operation failed: " + e.toString(), e);
throw e;
}
}
/** OAK-2624 : background read operations are split from background update ops */
private void internalRunBackgroundReadOperations() {
BackgroundReadStats readStats = null;
synchronized (backgroundReadMonitor) {
long start = clock.getTime();
// pull in changes from other cluster nodes
readStats = backgroundRead();
readStats.totalReadTime = clock.getTime() - start;
String msg = "Background read operations stats (read:{} {})";
logBackgroundOperation(start, msg, readStats.totalReadTime, readStats);
}
nodeStoreStatsCollector.doneBackgroundRead(readStats);
}
//----------------------< background sweep operation >----------------------
void runBackgroundSweepOperation() {
if (readOnlyMode || isDisposed.get()) {
return;
}
try {
internalRunBackgroundSweepOperation();
} catch (RuntimeException e) {
if (isDisposed.get()) {
return;
}
LOG.warn("Background sweep operation failed (will be retried with next run): " + e.toString(), e);
throw e;
}
}
private void internalRunBackgroundSweepOperation()
throws DocumentStoreException {
BackgroundWriteStats stats = new BackgroundWriteStats();
synchronized (backgroundSweepMonitor) {
long start = clock.getTime();
stats.num = backgroundSweep();
stats.sweep = clock.getTime() - start;
String msg = "Background sweep operation stats ({})";
logBackgroundOperation(start, msg, stats);
}
nodeStoreStatsCollector.doneBackgroundUpdate(stats);
}
/**
* Log a background operation with the given {@code startTime}. The
* {@code message} is logged at INFO if more than 10 seconds passed since
* the {@code startTime}, otherwise at DEBUG.
*
* @param startTime time when the background operation started.
* @param message the log message.
* @param arguments the arguments for the log message.
*/
private void logBackgroundOperation(long startTime,
String message,
Object... arguments) {
if (clock.getTime() - startTime > SECONDS.toMillis(10)) {
// log as info if it took more than 10 seconds
LOG.info(message, arguments);
} else {
LOG.debug(message, arguments);
}
}
/**
* Renews the cluster lease if necessary.
*
* @return {@code true} if the lease was renewed; {@code false} otherwise.
*/
boolean renewClusterIdLease() {
Stopwatch sw = Stopwatch.createStarted();
boolean renewed = clusterNodeInfo.renewLease();
if (renewed) {
nodeStoreStatsCollector.doneLeaseUpdate(sw.elapsed(MICROSECONDS));
}
return renewed;
}
/**
* Updates the state about cluster nodes in {@link #clusterNodes}.
*
* @return true if the cluster state has changed, false if the cluster state
* remained unchanged
*/
boolean updateClusterState() {
boolean hasChanged = false;
Set<Integer> clusterIds = Sets.newHashSet();
for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(nonLeaseCheckingStore)) {
int cId = doc.getClusterId();
clusterIds.add(cId);
ClusterNodeInfoDocument old = clusterNodes.get(cId);
// do not replace document for inactive cluster node
// in order to keep the created timestamp of the document
// for the time when the cluster node was first seen inactive
if (old != null && !old.isActive() && !doc.isActive()) {
continue;
}
clusterNodes.put(cId, doc);
if (old == null || old.isActive() != doc.isActive()) {
hasChanged = true;
}
}
hasChanged |= clusterNodes.keySet().retainAll(clusterIds);
return hasChanged;
}
/**
* @return the minimum revisions of foreign cluster nodes since they were
* started. The revision is derived from the start time of the
* cluster node.
*/
@NotNull
private RevisionVector getMinExternalRevisions() {
return Utils.getStartRevisions(clusterNodes.values()).remove(getClusterId());
}
/**
* Perform a background read and make external changes visible.
*/
private BackgroundReadStats backgroundRead() {
return new ExternalChange(this) {
@Override
void invalidateCache(@NotNull Iterable<String> paths) {
stats.cacheStats = store.invalidateCache(pathToId(paths));
}
@Override
void invalidateCache() {
stats.cacheStats = store.invalidateCache();
}
@Override
void updateHead(@NotNull Set<Revision> externalChanges,
@NotNull RevisionVector sweepRevs,
@Nullable Iterable<String> changedPaths) {
long time = clock.getTime();
// make sure no local commit is in progress
backgroundOperationLock.writeLock().lock();
try {
stats.lock = clock.getTime() - time;
RevisionVector oldHead = getHeadRevision();
RevisionVector newHead = oldHead;
for (Revision r : externalChanges) {
newHead = newHead.update(r);
}
setRoot(newHead);
// update sweep revisions
sweepRevisions = sweepRevisions.pmax(sweepRevs);
commitQueue.headRevisionChanged();
time = clock.getTime();
if (changedPaths != null) {
// then there were external changes and reading them
// was successful -> apply them to the diff cache
try {
JournalEntry.applyTo(changedPaths, diffCache,
ROOT, oldHead, newHead);
} catch (Exception e1) {
LOG.error("backgroundRead: Exception while processing external changes from journal: " + e1, e1);
}
}
stats.populateDiffCache = clock.getTime() - time;
time = clock.getTime();
ChangeSet changeSet = getChangeSetBuilder().build();
LOG.debug("Dispatching external change with ChangeSet {}", changeSet);
dispatcher.contentChanged(getRoot().fromExternalChange(),
newCommitInfo(changeSet, getJournalPropertyHandler()));
} finally {
backgroundOperationLock.writeLock().unlock();
}
stats.dispatchChanges = clock.getTime() - time;
}
}.process();
}
private static CommitInfo newCommitInfo(@NotNull ChangeSet changeSet, JournalPropertyHandler journalPropertyHandler) {
CommitContext commitContext = new SimpleCommitContext();
commitContext.set(COMMIT_CONTEXT_OBSERVATION_CHANGESET, changeSet);
journalPropertyHandler.addTo(commitContext);
Map<String, Object> info = ImmutableMap.<String, Object>of(CommitContext.NAME, commitContext);
return new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN, info, true);
}
private static ChangeSet getChangeSet(CommitInfo info) {
CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
if (commitContext == null){
return null;
}
return (ChangeSet) commitContext.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
}
private ChangeSetBuilder newChangeSetBuilder() {
return new ChangeSetBuilder(changeSetMaxItems, changeSetMaxDepth);
}
private void cleanOrphanedBranches() {
Branch b;
while ((b = branches.pollOrphanedBranch()) != null) {
LOG.debug("Cleaning up orphaned branch with base revision: {}, " +
"commits: {}", b.getBase(), b.getCommits());
UpdateOp op = new UpdateOp(Utils.getIdFromPath(ROOT), false);
for (Revision r : b.getCommits()) {
r = r.asTrunkRevision();
NodeDocument.removeRevision(op, r);
NodeDocument.removeBranchCommit(op, r);
}
store.findAndUpdate(NODES, op);
}
}
private void cleanRootCollisions() {
String id = Utils.getIdFromPath(ROOT);
NodeDocument root = store.find(NODES, id);
if (root != null) {
cleanCollisions(root, Integer.MAX_VALUE);
}
}
private void cleanCollisions(NodeDocument doc, int limit) {
RevisionVector head = getHeadRevision();
Map<Revision, String> map = doc.getLocalMap(NodeDocument.COLLISIONS);
UpdateOp op = new UpdateOp(doc.getId(), false);
for (Revision r : map.keySet()) {
if (r.getClusterId() == clusterId) {
// remove collision if there is no active branch with
// this revision and the revision is before the current
// head. That is, the collision cannot be related to commit
// which is progress.
if (branches.getBranchCommit(r) == null
&& !head.isRevisionNewer(r)) {
NodeDocument.removeCollision(op, r);
if (--limit <= 0) {
break;
}
}
}
}
if (op.hasChanges()) {
LOG.debug("Removing collisions {} on {}",
op.getChanges().keySet(), doc.getId());
store.findAndUpdate(NODES, op);
}
}
private void backgroundSplit() {
final int initialCapacity = getCreateOrUpdateBatchSize() + 4;
Set<Path> invalidatedPaths = new HashSet<>(initialCapacity);
Set<Path> pathsToInvalidate = new HashSet<>(initialCapacity);
RevisionVector head = getHeadRevision();
// OAK-9149 : With backgroundSplit being done in batches, the
// updateOps must be executed in "phases".
// Reason being that the (DocumentStore) batch calls
// are not atomic. That means they could potentially
// be partially executed only - without any guarantees on
// which part is executed and which not.
// The split algorithm, however, requires that
// a part of the operations, namely intermediate/garbage/split ops,
// are executed *before* the main document is updated.
// In order to reflect this necessity in the batch variant,
// all those intermediate/garbage/split updateOps are grouped
// into a first phase - and the main document updateOps in a second phase.
// That way, if the first phase fails, partially, the main documents
// are not yet touched.
// TODO but if the split fails, we create actual garbage that cannot
// be cleaned up later, since there is no "pointer" to it. That's
// something to look at/consider at some point.
// phase1 therefore only contains intermediate/garbage/split updateOps
List<UpdateOp> splitOpsPhase1 = new ArrayList<>(initialCapacity);
// phase2 contains main document updateOps.
List<UpdateOp> splitOpsPhase2 = new ArrayList<>(initialCapacity);
List<String> removeCandidates = new ArrayList<>(initialCapacity);
for (String id : splitCandidates.keySet()) {
NodeDocument doc = store.find(Collection.NODES, id);
if (doc == null) {
continue;
}
cleanCollisions(doc, collisionGarbageBatchSize);
Iterator<UpdateOp> it = doc.split(this, head, binarySize).iterator();
while(it.hasNext()) {
UpdateOp op = it.next();
Path path = doc.getPath();
// add an invalidation journal entry, unless the path
// already has a pending _lastRev update or an invalidation
// entry was already added in this backgroundSplit() call
if (unsavedLastRevisions.get(path) == null && !invalidatedPaths.contains(path)) {
pathsToInvalidate.add(path);
}
// the last entry is the main document update
// (as per updated NodeDocument.split documentation).
if (it.hasNext()) {
splitOpsPhase1.add(op);
} else {
splitOpsPhase2.add(op);
}
}
removeCandidates.add(id);
if (splitOpsPhase1.size() >= getCreateOrUpdateBatchSize()
|| splitOpsPhase2.size() >= getCreateOrUpdateBatchSize()) {
invalidatePaths(pathsToInvalidate);
batchSplit(splitOpsPhase1);
batchSplit(splitOpsPhase2);
invalidatedPaths.addAll(pathsToInvalidate);
pathsToInvalidate.clear();
splitOpsPhase1.clear();
splitOpsPhase2.clear();
splitCandidates.keySet().removeAll(removeCandidates);
removeCandidates.clear();
}
}
if (splitOpsPhase1.size() + splitOpsPhase2.size() > 0) {
invalidatePaths(pathsToInvalidate);
batchSplit(splitOpsPhase1);
batchSplit(splitOpsPhase2);
splitCandidates.keySet().removeAll(removeCandidates);
}
}
private void invalidatePaths(@NotNull Set<Path> pathsToInvalidate) {
if (pathsToInvalidate.isEmpty()) {
// nothing to do
return;
}
// create journal entry for cache invalidation
JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
entry.modified(pathsToInvalidate);
Revision r = newRevision().asBranchRevision();
UpdateOp journalOp = entry.asUpdateOp(r);
if (store.create(JOURNAL, singletonList(journalOp))) {
changes.invalidate(singletonList(r));
LOG.debug("Journal entry {} created for split of document(s) {}",
journalOp.getId(), pathsToInvalidate);
} else {
String msg = "Unable to create journal entry " +
journalOp.getId() + " for document invalidation. " +
"Will be retried with next background split " +
"operation.";
throw new DocumentStoreException(msg);
}
}
private void batchSplit(@NotNull List<UpdateOp> splitOps) {
if (splitOps.isEmpty()) {
// nothing to do
return;
}
// apply the split operations
List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, splitOps);
if (LOG.isDebugEnabled()) {
// this is rather expensive - but given we were doing log.debug before
// the batchSplit mechanism, so this somewhat negates the batch improvement indeed
for (int i = 0; i < splitOps.size(); i++) {
UpdateOp op = splitOps.get(i);
NodeDocument before = beforeList.size() > i ? beforeList.get(i) : null;
if (before != null) {
NodeDocument after = store.find(Collection.NODES, op.getId());
if (after != null) {
LOG.debug("Split operation on {}. Size before: {}, after: {}",
op.getId(), before.getMemory(), after.getMemory());
}
} else {
LOG.debug("Split operation created {}", op.getId());
}
}
}
}
@NotNull
Set<String> getSplitCandidates() {
return Collections.unmodifiableSet(splitCandidates.keySet());
}
@NotNull
RevisionVector getSweepRevisions() {
return sweepRevisions;
}
int getCreateOrUpdateBatchSize() {
return createOrUpdateBatchSize;
}
//-----------------------------< internal >---------------------------------
private BackgroundWriteStats backgroundWrite() {
return unsavedLastRevisions.persist(getDocumentStore(),
new Supplier<Revision>() {
@Override
public Revision get() {
return getSweepRevisions().getRevision(getClusterId());
}
}, new UnsavedModifications.Snapshot() {
@Override
public void acquiring(Revision mostRecent) {
pushJournalEntry(mostRecent);
}
}, backgroundOperationLock.writeLock());
}
private void maybeRefreshHeadRevision() {
if (isDisposed.get() || isDisableBranches()) {
return;
}
DocumentNodeState rootState = getRoot();
// check if local head revision is outdated and needs an update
// this ensures the head and sweep revisions are recent and the
// revision garbage collector can remove old documents
Revision head = rootState.getRootRevision().getRevision(clusterId);
Revision lastRev = rootState.getLastRevision().getRevision(clusterId);
long oneMinuteAgo = clock.getTime() - ONE_MINUTE_MS;
if ((head != null && head.getTimestamp() < oneMinuteAgo) ||
(lastRev != null && lastRev.getTimestamp() < oneMinuteAgo)) {
// head was not updated for more than a minute
// create an empty commit that updates the head
boolean success = false;
Commit c = newTrunkCommit(nop -> {}, getHeadRevision());
try {
c.markChanged(ROOT);
done(c, false, CommitInfo.EMPTY);
success = true;
} finally {
if (!success) {
canceled(c);
}
}
}
}
/**
* Executes the sweep2 (from within a background thread)
* @param sweep2Lock the lock value originally acquired
* @return true if sweep2 is done or no longer needed, false otherwise (in which case it should be retried)
* @throws DocumentStoreException
*/
boolean backgroundSweep2(long sweep2Lock) throws DocumentStoreException {
if (sweep2Lock == 0) {
sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
if (sweep2Lock == 0) {
// still not well defined, retry in a minute (done in BackgroundSweep2Operation)
return false;
}
if (sweep2Lock == -1) {
// then we're done
return true;
}
}
// sweep2Lock > 0, the local instance holds the lock
Sweep2StatusDocument statusDoc = Sweep2StatusDocument.readFrom(store);
if (statusDoc != null /*should never be null as we hold the lock, but let's play safe anyway .. */
&& statusDoc.isChecking()) {
// very likely no 'isSweep2Necessary' might not have been done yet, let's do it now
LOG.info("backgroundSweep2: checking whether sweep2 is necessary...");
if (!Sweep2Helper.isSweep2Necessary(store)) {
LOG.info("backgroundSweep2: sweep2 check determined a sweep2 is NOT necessary. Marking sweep2 status as 'swept'.");
if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
LOG.error("backgroundSweep2 : failed to update the sweep2 status to 'swept'. Aborting sweep2 for now.");
}
return true;
}
LOG.info("backgroundSweep2: sweep2 check determined a sweep2 IS necessary. Marking sweep2 status as 'sweeping'.");
}
// update the lock status one more time to ensure no check can conclude sweep2 is not necessary anymore
sweep2Lock = Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId,
true /* this true here is what's relevant: locks-in the 'sweeping' status */);
if (sweep2Lock == 0) {
// something came in between, retry later
LOG.info("backgroundSweep2: could not update the sweep2 lock to sweeping, someone got in between. Retry later.");
return false;
} else if (sweep2Lock == -1) {
// odd, someone else concluded we're done
LOG.info("backgroundSweep2: meanwhile, someone else concluded sweep2 is done.");
return true;
}
// compile the list of clusterIds for which sweep2 should be done.
// in an ideal situation sweep(1) would have been done for all clusterIds,
// and in that case "_sweepRev" will contain all clusterIds ever used.
// However, it is a supported situation that not sweep(1) was not run for all clusterIds,
// and in this case sweep2 must also not be done for all, but only for those clusterIds.
List<Integer> includedClusterIds = new LinkedList<>();
for (Revision aSweepRev : sweepRevisions) {
includedClusterIds.add(aSweepRev.getClusterId());
}
// at this point we did properly acquire a lock and can go ahead doing sweep2
LOG.info("backgroundSweep2: starting sweep2 (includedClusterIds={})", includedClusterIds);
int num = forceBackgroundSweep2(includedClusterIds);
LOG.info("backgroundSweep2: finished sweep2, num swept=" + num);
// release the lock.
// Note that in theory someone else could have released our lock, or that
// the sweep2 status was deleted - that actually doesn't matter:
// we just went through a full, successful sweep2 and we want to record it
// that way - irrespective of any interference with the status
// -> hence the 'force' aspect of releasing here
if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
LOG.error("backgroundSweep2 : sweep2 finished but we failed to update the sweep2 status accordingly");
}
return true;
}
/**
* Executes a sweep2 either only for the provided or for all clusterIds otherwise
* (which case applies depends on the status of sweep(1) in the repository).
* @param includedClusterIds restrict sweep2 to only these clusterIds - or do it for
* all clusterIds if this list is empty or null.
* @return number of documents swept
* @throws DocumentStoreException
*/
int forceBackgroundSweep2(List<Integer> includedClusterIds) throws DocumentStoreException {
final RevisionVector emptySweepRevision = new RevisionVector();
CommitValueResolver cvr = new CachingCommitValueResolver(
0 /* disable caching for sweep2 as caching has a risk of propagating wrong values */,
() -> emptySweepRevision);
MissingBcSweeper2 sweeper = new MissingBcSweeper2(this, cvr, includedClusterIds, isDisposed);
LOG.info("Starting document sweep2. Head: {}, starting at 0", getHeadRevision());
Iterable<NodeDocument> docs = lastRevSeeker.getCandidates(0);
try {
final AtomicInteger numUpdates = new AtomicInteger();
sweeper.sweep2(docs, new NodeDocumentSweepListener() {
@Override
public void sweepUpdate(final Map<Path, UpdateOp> updates)
throws DocumentStoreException {
// create a synthetic commit. this commit does not have any
// changes, we just use it to create a journal entry for
// cache invalidation and apply the sweep updates
backgroundOperationLock.readLock().lock();
try {
boolean success = false;
Revision r = commitQueue.createRevision();
try {
commitQueue.done(r, new CommitQueue.Callback() {
@Override
public void headOfQueue(@NotNull Revision revision) {
writeUpdates(updates, revision);
}
});
success = true;
} finally {
if (!success && commitQueue.contains(r)) {
commitQueue.canceled(r);
}
}
} finally {
backgroundOperationLock.readLock().unlock();
}
}
private void writeUpdates(Map<Path, UpdateOp> updates,
Revision revision)
throws DocumentStoreException {
// create journal entry
JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
entry.modified(updates.keySet());
Revision r = newRevision().asBranchRevision();
if (!store.create(JOURNAL, singletonList(entry.asUpdateOp(r)))) {
String msg = "Unable to create journal entry for " +
"document invalidation. Will be retried with " +
"next background sweep2 operation.";
throw new DocumentStoreException(msg);
}
changes.invalidate(Collections.singleton(r));
unsavedLastRevisions.put(ROOT, revision);
RevisionVector newHead = getHeadRevision().update(revision);
setRoot(newHead);
commitQueue.headRevisionChanged();
store.createOrUpdate(NODES, Lists.newArrayList(updates.values()));
numUpdates.addAndGet(updates.size());
LOG.debug("Background sweep2 updated {}", updates.keySet());
}
});
return numUpdates.get();
} finally {
Utils.closeIfCloseable(docs);
}
}
private int backgroundSweep() throws DocumentStoreException {
// decide if it needs to run
Revision head = getHeadRevision().getRevision(clusterId);
if (head == null) {
// should never happen because the head revision vector
// always contains an element for the current clusterId,
// unless we are in read-only mode
return 0;
}
// are there in-doubt commit revisions that are older than
// the current head revision?
SortedSet<Revision> garbage = Sets.newTreeSet(StableRevisionComparator.INSTANCE);
for (Revision r : inDoubtTrunkCommits) {
if (r.compareRevisionTime(head) < 0) {
garbage.add(r);
}
}
// skip if there is no garbage and we have at least some sweep
// revision for the local clusterId. A sweep is needed even
// without garbage when an upgrade happened and no sweep revision
// exists for the local clusterId
Revision sweepRev = sweepRevisions.getRevision(clusterId);
if (garbage.isEmpty() && sweepRev != null) {
updateSweepRevision(head);
return 0;
}
Revision startRev = new Revision(0, 0, clusterId);
if (!garbage.isEmpty()) {
startRev = garbage.first();
}
String reason = "";
if (!garbage.isEmpty()) {
reason = garbage.size() + " garbage revision(s)";
}
if (sweepRev == null) {
if (! reason.isEmpty()) {
reason += ", ";
}
reason += "no sweepRevision for " + clusterId;
}
int num = forceBackgroundSweep(startRev, reason);
inDoubtTrunkCommits.removeAll(garbage);
return num;
}
private int forceBackgroundSweep(Revision startRev, String reason) throws DocumentStoreException {
NodeDocumentSweeper sweeper = new NodeDocumentSweeper(this, false);
LOG.info("Starting document sweep. Head: {}, starting at {} (reason: {})",
sweeper.getHeadRevision(), startRev, reason);
Iterable<NodeDocument> docs = lastRevSeeker.getCandidates(startRev.getTimestamp());
try {
final AtomicInteger numUpdates = new AtomicInteger();
Revision newSweepRev = sweeper.sweep(docs, new NodeDocumentSweepListener() {
@Override
public void sweepUpdate(final Map<Path, UpdateOp> updates)
throws DocumentStoreException {
// create a synthetic commit. this commit does not have any
// changes, we just use it to create a journal entry for
// cache invalidation and apply the sweep updates
backgroundOperationLock.readLock().lock();
try {
boolean success = false;
Revision r = commitQueue.createRevision();
try {
commitQueue.done(r, new CommitQueue.Callback() {
@Override
public void headOfQueue(@NotNull Revision revision) {
writeUpdates(updates, revision);
}
});
success = true;
} finally {
if (!success && commitQueue.contains(r)) {
commitQueue.canceled(r);
}
}
} finally {
backgroundOperationLock.readLock().unlock();
}
}
private void writeUpdates(Map<Path, UpdateOp> updates,
Revision revision)
throws DocumentStoreException {
// create journal entry
JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
entry.modified(updates.keySet());
Revision r = newRevision().asBranchRevision();
if (!store.create(JOURNAL, singletonList(entry.asUpdateOp(r)))) {
String msg = "Unable to create journal entry for " +
"document invalidation. Will be retried with " +
"next background sweep operation.";
throw new DocumentStoreException(msg);
}
changes.invalidate(Collections.singleton(r));
unsavedLastRevisions.put(ROOT, revision);
RevisionVector newHead = getHeadRevision().update(revision);
setRoot(newHead);
commitQueue.headRevisionChanged();
store.createOrUpdate(NODES, Lists.newArrayList(updates.values()));
numUpdates.addAndGet(updates.size());
LOG.debug("Background sweep updated {}", updates.keySet());
}
});
if (newSweepRev != null) {
updateSweepRevision(newSweepRev);
}
return numUpdates.get();
} finally {
Utils.closeIfCloseable(docs);
}
}
/**
* Updates the local sweep revision.
*
* @param newSweepRevision the new sweep revision for this document node
* store instance.
*/
private void updateSweepRevision(Revision newSweepRevision) {
backgroundOperationLock.readLock().lock();
try {
sweepRevisions = sweepRevisions.pmax(new RevisionVector(newSweepRevision));
} finally {
backgroundOperationLock.readLock().unlock();
}
}
/**
* Checks if this node store can operate on the data in the given document
* store.
*
* @param store the document store.
* @param readOnlyMode whether this node store is in read-only mode.
* @throws DocumentStoreException if the versions are incompatible given the
* access mode (read-write vs. read-only).
*/
private static void checkVersion(DocumentStore store, boolean readOnlyMode)
throws DocumentStoreException {
FormatVersion storeVersion = FormatVersion.versionOf(store);
if (!VERSION.canRead(storeVersion)) {
throw new DocumentStoreException("Cannot open DocumentNodeStore. " +
"Existing data in DocumentStore was written with more " +
"recent version. Store version: " + storeVersion +
", this version: " + VERSION);
}
if (!readOnlyMode) {
if (storeVersion == FormatVersion.V0) {
// no version present. set to current version
VERSION.writeTo(store);
LOG.info("FormatVersion set to {}", VERSION);
} else if (!VERSION.equals(storeVersion)) {
// version does not match. fail the check and
// require a manual upgrade first
throw new DocumentStoreException("Cannot open DocumentNodeStore " +
"in read-write mode. Existing data in DocumentStore " +
"was written with older version. Store version: " +
storeVersion + ", this version: " + VERSION + ". Use " +
"the oak-run-" + getModuleVersion() + ".jar tool " +
"with the unlockUpgrade command first.");
}
}
}
/**
* Checks the revision age as defined in
* {@link Utils#checkRevisionAge(DocumentStore, ClusterNodeInfo, Clock)}
* and disposes the passed cluster node {@code info} if the check fails.
*
* @param store the document store from where to read the root document.
* @param info the cluster node info with the clusterId.
* @param clock the clock to get the current time.
* @throws DocumentStoreException if the check fails.
*/
private static void checkRevisionAge(DocumentStore store,
ClusterNodeInfo info,
Clock clock)
throws DocumentStoreException {
boolean success = false;
try {
Utils.checkRevisionAge(store, info, clock);
success = true;
} finally {
if (!success) {
info.dispose();
}
}
}
private void pushJournalEntry(Revision r) throws DocumentStoreException {
if (!changes.hasChanges()) {
LOG.debug("Not pushing journal as there are no changes");
} else if (store.create(JOURNAL, singletonList(changes.asUpdateOp(r)))) {
// success: start with a new document
changes = newJournalEntry();
} else {
// fail: log and keep the changes
LOG.error("Failed to write to journal({}), accumulating changes for future write (~{} bytes, {} paths)",
r, changes.getMemory(), changes.getNumChangedNodes());
}
}
/**
* Returns the binary size of a property value represented as a JSON or
* {@code -1} if the property is not of type binary.
*
* @param json the property value.
* @return the size of the referenced binary value(s); otherwise {@code -1}.
*/
private long getBinarySize(@Nullable String json) {
if (json == null) {
return -1;
}
PropertyState p = new DocumentPropertyState(
DocumentNodeStore.this, "p", json);
if (p.getType().tag() != PropertyType.BINARY) {
return -1;
}
long size = 0;
if (p.isArray()) {
for (int i = 0; i < p.count(); i++) {
size += p.size(i);
}
} else {
size = p.size();
}
return size;
}
private JournalEntry newJournalEntry() {
return new JournalEntry(store, true, newChangeSetBuilder(), journalPropertyHandlerFactory.newHandler());
}
/**
* Performs an initial read of the _lastRevs on the root document and sets
* the root state.
*
* @param rootDoc the current root document.
*/
private void initializeRootState(NodeDocument rootDoc) {
checkState(root == null);
try {
alignWithExternalRevisions(rootDoc, clock, clusterId);
} catch (InterruptedException e) {
throw new DocumentStoreException("Interrupted while aligning with " +
"external revision: " + rootDoc.getLastRev());
}
RevisionVector headRevision = new RevisionVector(
rootDoc.getLastRev().values()).update(newRevision());
setRoot(headRevision);
}
private Commit newTrunkCommit(@NotNull Changes changes,
@NotNull RevisionVector base) {
checkArgument(!checkNotNull(base).isBranch(),
"base must not be a branch revision: " + base);
// build commit before revision is created by the commit queue (OAK-7869)
CommitBuilder commitBuilder = newCommitBuilder(base, null);
changes.with(commitBuilder);
boolean success = false;
Commit c;
backgroundOperationLock.readLock().lock();
try {
checkOpen();
c = commitBuilder.build(commitQueue.createRevision());
inDoubtTrunkCommits.add(c.getRevision());
success = true;
} finally {
if (!success) {
backgroundOperationLock.readLock().unlock();
}
}
return c;
}
@NotNull
private Commit newBranchCommit(@NotNull Changes changes,
@NotNull RevisionVector base,
@Nullable DocumentNodeStoreBranch branch) {
checkArgument(checkNotNull(base).isBranch(),
"base must be a branch revision: " + base);
checkOpen();
Revision commitRevision = newRevision();
CommitBuilder commitBuilder = newCommitBuilder(base, commitRevision);
changes.with(commitBuilder);
if (isDisableBranches()) {
// Regular branch commits do not need to acquire the background
// operation lock because the head is not updated and no pending
// lastRev updates are done on trunk. When branches are disabled,
// a branch commit becomes a pseudo trunk commit and the lock
// must be acquired.
backgroundOperationLock.readLock().lock();
} else {
Revision rev = commitRevision.asBranchRevision();
// remember branch commit
Branch b = getBranches().getBranch(base);
if (b == null) {
// baseRev is marker for new branch
getBranches().create(base.asTrunkRevision(), rev, branch);
LOG.debug("Branch created with base revision {}", base);
if (LOG.isTraceEnabled()) {
LOG.trace("Branch created", new Exception());
}
} else {
b.addCommit(rev);
}
}
return commitBuilder.build();
}
@NotNull
private CommitBuilder newCommitBuilder(@NotNull RevisionVector base,
@Nullable Revision commitRevision) {
CommitBuilder cb;
if (commitRevision != null) {
cb = new CommitBuilder(this, commitRevision, base);
} else {
cb = new CommitBuilder(this, base);
}
RevisionVector startRevs = Utils.getStartRevisions(clusterNodes.values());
return cb.withStartRevisions(startRevs);
}
/**
* Checks if this store is still open and throws an
* {@link IllegalStateException} if it is already disposed (or a dispose
* is in progress).
*
* @throws IllegalStateException if this store is disposed.
*/
private void checkOpen() throws IllegalStateException {
if (isDisposed.get()) {
throw new IllegalStateException("This DocumentNodeStore is disposed");
}
}
/**
* Search for presence of child node as denoted by path in the children cache of parent
*
* @param path the path of the child node
* @param rev revision at which check is performed
* @return <code>true</code> if and only if the children cache entry for parent path is complete
* and that list does not have the given child node. A <code>false</code> indicates that node <i>might</i>
* exist
*/
private boolean checkNodeNotExistsFromChildrenCache(Path path,
RevisionVector rev) {
final Path parentPath = path.getParent();
if (parentPath == null) {
return false;
}
NamePathRev key = childNodeCacheKey(parentPath, rev, "");//read first child cache entry
DocumentNodeState.Children children = nodeChildrenCache.getIfPresent(key);
String lookupChildName = path.getName();
//Does not know about children so cannot say for sure
if (children == null) {
return false;
}
//List not complete so cannot say for sure
if (children.hasMore) {
return false;
}
int childPosition = Collections.binarySearch(children.children, lookupChildName);
if (childPosition < 0) {
//Node does not exist for sure
LOG.trace("Child node as per path {} does not exist at revision {}", path, rev);
return true;
}
return false;
}
private String diffImpl(AbstractDocumentNodeState from, AbstractDocumentNodeState to)
throws DocumentStoreException {
int max = MANY_CHILDREN_THRESHOLD;
final boolean debug = LOG.isDebugEnabled();
final long start = debug ? now() : 0;
long getChildrenDoneIn = start;
String diff = null;
String diffAlgo = null;
RevisionVector fromRev = null;
RevisionVector toRev = null;
long minTimestamp = Utils.getMinTimestampForDiff(
from.getRootRevision(), to.getRootRevision(),
getMinExternalRevisions());
long minJournalTimestamp = newRevision().getTimestamp() -
journalGarbageCollector.getMaxRevisionAgeMillis() / 2;
// use journal if possible
Revision tailRev = journalGarbageCollector.getTailRevision();
if (!disableJournalDiff
&& tailRev.getTimestamp() < minTimestamp
&& minJournalTimestamp < minTimestamp) {
try {
diff = new JournalDiffLoader(from, to, this).call();
diffAlgo = "diffJournalChildren";
fromRev = from.getRootRevision();
toRev = to.getRootRevision();
} catch (RuntimeException e) {
LOG.warn("diffJournalChildren failed with " +
e.getClass().getSimpleName() +
", falling back to classic diff", e);
}
}
if (diff == null) {
// fall back to classic diff
fromRev = from.getLastRevision();
toRev = to.getLastRevision();
JsopWriter w = new JsopStream();
boolean continueDiff = bundledDocDiffer.diff(from, to, w);
if (continueDiff) {
DocumentNodeState.Children fromChildren, toChildren;
fromChildren = getChildren(from, "", max);
toChildren = getChildren(to, "", max);
getChildrenDoneIn = debug ? now() : 0;
if (!fromChildren.hasMore && !toChildren.hasMore) {
diffAlgo = "diffFewChildren";
diffFewChildren(w, from.getPath(), fromChildren,
fromRev, toChildren, toRev);
} else {
if (FAST_DIFF) {
diffAlgo = "diffManyChildren";
fromRev = from.getRootRevision();
toRev = to.getRootRevision();
diffManyChildren(w, from.getPath(), fromRev, toRev);
} else {
diffAlgo = "diffAllChildren";
max = Integer.MAX_VALUE;
fromChildren = getChildren(from, "", max);
toChildren = getChildren(to, "", max);
diffFewChildren(w, from.getPath(), fromChildren,
fromRev, toChildren, toRev);
}
}
} else {
diffAlgo = "allBundledChildren";
}
diff = w.toString();
}
if (debug) {
long end = now();
LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms), diff '{}', external '{}",
diffAlgo, from.getPath(), fromRev, toRev,
end - start, getChildrenDoneIn - start, diff,
to.isFromExternalChange());
}
return diff;
}
private void diffManyChildren(JsopWriter w, Path path,
RevisionVector fromRev,
RevisionVector toRev) {
long minTimestamp = Utils.getMinTimestampForDiff(
fromRev, toRev, getMinExternalRevisions());
for (RevisionVector r : new RevisionVector[]{fromRev, toRev}) {
if (r.isBranch()) {
Branch b = branches.getBranch(r);
if (b != null) {
minTimestamp = Math.min(b.getBase().getRevision(clusterId).getTimestamp(), minTimestamp);
}
}
}
long minValue = NodeDocument.getModifiedInSecs(minTimestamp);
String fromKey = Utils.getKeyLowerLimit(path);
String toKey = Utils.getKeyUpperLimit(path);
Set<Path> paths = Sets.newHashSet();
LOG.debug("diffManyChildren: path: {}, fromRev: {}, toRev: {}", path, fromRev, toRev);
for (NodeDocument doc : store.query(Collection.NODES, fromKey, toKey,
NodeDocument.MODIFIED_IN_SECS, minValue, Integer.MAX_VALUE)) {
paths.add(doc.getPath());
}
LOG.debug("diffManyChildren: Affected paths: {}", paths.size());
// also consider nodes with not yet stored modifications (OAK-1107)
Revision minRev = new Revision(minTimestamp, 0, getClusterId());
addPathsForDiff(path, paths, getPendingModifications().getPaths(minRev));
for (RevisionVector rv : new RevisionVector[]{fromRev, toRev}) {
if (rv.isBranch()) {
Revision r = rv.getBranchRevision();
Branch b = branches.getBranch(rv);
if (b != null) {
addPathsForDiff(path, paths, b.getModifiedPathsUntil(r));
}
}
}
for (Path p : paths) {
DocumentNodeState fromNode = getNode(p, fromRev);
DocumentNodeState toNode = getNode(p, toRev);
String name = p.getName();
LOG.trace("diffManyChildren: Changed Path {}", path);
if (fromNode != null) {
// exists in fromRev
if (toNode != null) {
// exists in both revisions
// check if different
RevisionVector a = fromNode.getLastRevision();
RevisionVector b = toNode.getLastRevision();
if (a == null && b == null) {
// ok
} else if (a == null || b == null || !a.equals(b)) {
w.tag('^').key(name).object().endObject();
}
} else {
// does not exist in toRev -> was removed
w.tag('-').value(name);
}
} else {
// does not exist in fromRev
if (toNode != null) {
// exists in toRev
w.tag('+').key(name).object().endObject();
} else {
// does not exist in either revisions
// -> do nothing
}
}
}
}
private static void addPathsForDiff(Path path,
Set<Path> paths,
Iterable<Path> modified) {
for (Path p : modified) {
if (p.isRoot()) {
continue;
}
Path parent = p.getParent();
if (path.equals(parent)) {
paths.add(p);
}
}
}
private void diffFewChildren(JsopWriter w, Path parentPath,
DocumentNodeState.Children fromChildren,
RevisionVector fromRev,
DocumentNodeState.Children toChildren,
RevisionVector toRev) {
Set<String> childrenSet = Sets.newHashSet(toChildren.children);
for (String n : fromChildren.children) {
if (!childrenSet.contains(n)) {
w.tag('-').value(n);
} else {
Path path = new Path(parentPath, n);
DocumentNodeState n1 = getNode(path, fromRev);
DocumentNodeState n2 = getNode(path, toRev);
// this is not fully correct:
// a change is detected if the node changed recently,
// even if the revisions are well in the past
// if this is a problem it would need to be changed
checkNotNull(n1, "Node at [%s] not found for fromRev [%s]", path, fromRev);
checkNotNull(n2, "Node at [%s] not found for toRev [%s]", path, toRev);
if (!n1.getLastRevision().equals(n2.getLastRevision())) {
w.tag('^').key(n).object().endObject();
}
}
}
childrenSet = Sets.newHashSet(fromChildren.children);
for (String n : toChildren.children) {
if (!childrenSet.contains(n)) {
w.tag('+').key(n).object().endObject();
}
}
}
private static NamePathRev childNodeCacheKey(@NotNull Path path,
@NotNull RevisionVector readRevision,
@NotNull String name) {
return new NamePathRev(name, path, readRevision);
}
private static DocumentRootBuilder asDocumentRootBuilder(NodeBuilder builder)
throws IllegalArgumentException {
if (!(builder instanceof DocumentRootBuilder)) {
throw new IllegalArgumentException("builder must be a " +
DocumentRootBuilder.class.getName());
}
return (DocumentRootBuilder) builder;
}
private static long now(){
return System.currentTimeMillis();
}
private void checkBranchAge(Branch b) throws CommitFailedException {
// check if initial branch commit is too old to merge
long journalMaxAge = journalGarbageCollector.getMaxRevisionAgeMillis();
long branchMaxAge = journalMaxAge / 2;
long created = b.getCommits().first().getTimestamp();
long branchAge = newRevision().getTimestamp() - created;
if (branchAge > branchMaxAge) {
String msg = "Long running commit detected. Branch was created " +
Utils.timestampToString(created) + ". Consider breaking " +
"the commit down into smaller pieces or increasing the " +
"'journalGCMaxAge' currently set to " + journalMaxAge +
" ms (" + MILLISECONDS.toMinutes(journalMaxAge) + " min).";
throw new CommitFailedException(OAK, 200, msg);
}
}
/**
* Creates and returns a MarkSweepGarbageCollector if the current BlobStore
* supports garbage collection
*
* @param blobGcMaxAgeInSecs
* @param repositoryId
* @param whiteboard
* @param statisticsProvider
* @return garbage collector of the BlobStore supports GC otherwise null
*/
@Nullable
public MarkSweepGarbageCollector createBlobGarbageCollector(long blobGcMaxAgeInSecs, String repositoryId,
Whiteboard whiteboard, StatisticsProvider statisticsProvider) {
MarkSweepGarbageCollector blobGC = null;
if(blobStore instanceof GarbageCollectableBlobStore){
try {
blobGC = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(this),
(GarbageCollectableBlobStore) blobStore,
executor,
SECONDS.toMillis(blobGcMaxAgeInSecs),
repositoryId,
whiteboard,
statisticsProvider);
} catch (IOException e) {
throw new RuntimeException("Error occurred while initializing " +
"the MarkSweepGarbageCollector",e);
}
}
return blobGC;
}
void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) {
this.clusterStateChangeListener = clusterStateChangeListener;
}
private void signalClusterStateChange() {
if (clusterStateChangeListener != null) {
clusterStateChangeListener.handleClusterStateChange();
}
}
public DocumentNodeStoreMBean getMBean() {
return mbean;
}
private DocumentNodeStoreMBean createMBean(DocumentNodeStoreBuilder<?> builder) {
try {
return new DocumentNodeStoreMBeanImpl(this,
builder.getStatisticsProvider().getStats(),
clusterNodes.values());
} catch (NotCompliantMBeanException e) {
throw new IllegalStateException(e);
}
}
private static abstract class NodeStoreTask implements Runnable {
final WeakReference<DocumentNodeStore> ref;
private final AtomicBoolean isDisposed;
private final Supplier<Integer> delaySupplier;
private boolean failing;
NodeStoreTask(final DocumentNodeStore nodeStore,
final AtomicBoolean isDisposed,
Supplier<Integer> delay) {
this.ref = new WeakReference<DocumentNodeStore>(nodeStore);
this.isDisposed = isDisposed;
if (delay == null) {
delay = new Supplier<Integer>() {
@Override
public Integer get() {
DocumentNodeStore ns = ref.get();
return ns != null ? ns.getAsyncDelay() : 0;
}
};
}
this.delaySupplier = delay;
}
NodeStoreTask(final DocumentNodeStore nodeStore,
final AtomicBoolean isDisposed) {
this(nodeStore, isDisposed, null);
}
protected abstract void execute(@NotNull DocumentNodeStore nodeStore);
@Override
public void run() {
int delay = delaySupplier.get();
while (delay != 0 && !isDisposed.get()) {
synchronized (isDisposed) {
try {
isDisposed.wait(delay);
} catch (InterruptedException e) {
// ignore
}
}
DocumentNodeStore nodeStore = ref.get();
if (nodeStore != null) {
try {
execute(nodeStore);
if (failing) {
LOG.info("Background operation {} successful again",
getClass().getSimpleName());
failing = false;
}
} catch (Throwable t) {
failing = true;
LOG.warn("Background operation failed: " + t.toString(), t);
}
delay = delaySupplier.get();
} else {
// node store not in use anymore
break;
}
}
}
}
/**
* Background update operations.
*/
static class BackgroundUpdateOperation extends NodeStoreTask {
BackgroundUpdateOperation(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed);
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
nodeStore.runBackgroundUpdateOperations();
}
}
/**
* Background read operations.
*/
static class BackgroundReadOperation extends NodeStoreTask {
BackgroundReadOperation(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed);
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
nodeStore.runBackgroundReadOperations();
}
}
/**
* Background sweep operation.
*/
private static class BackgroundSweepOperation extends NodeStoreTask {
BackgroundSweepOperation(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed, getDelay(nodeStore));
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
nodeStore.backgroundSweep();
}
private static Supplier<Integer> getDelay(DocumentNodeStore ns) {
int delay = 0;
if (ns.getAsyncDelay() != 0) {
delay = (int) SECONDS.toMillis(MODIFIED_IN_SECS_RESOLUTION);
}
return Suppliers.ofInstance(delay);
}
}
/**
* Background sweep2 operation (also see OAK-9176 for details/context).
*/
private static class BackgroundSweep2Operation extends NodeStoreTask {
private final long sweep2Lock;
private int retryCount = 0;
BackgroundSweep2Operation(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed,
long sweep2Lock) {
// default asyncDelay is 1000ms == 1sec
// the sweep2 is fine to run every 60sec by default as it is not time critical
// to achieve this we're doing a Math.min(60sec, 60 * getAsyncDelay())
super(nodeStore, isDisposed,
Suppliers.ofInstance(Math.min(60000, 60 * nodeStore.getAsyncDelay())));
if (sweep2Lock < 0) {
throw new IllegalArgumentException("sweep2Lock must not be negative");
}
this.sweep2Lock = sweep2Lock;
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
if (retryCount > 0) {
LOG.info("BackgroundSweep2Operation.execute: retrying sweep2. retryCount=" + retryCount);
}
if (nodeStore.backgroundSweep2(sweep2Lock)) {
done();
}
// log the fact that we noticed an ongoing sweep2 - is only logged once every 5min
// until either the other instance finishes or crashes, at which point we or another
// instance will pick up
if (++retryCount % 5 == 0) {
LOG.info("BackgroundSweep2Operation.execute: another instance is currently (still) executing sweep2. "
+ "Waiting for its outcome. retryCount=" + retryCount);
}
}
private void done() {
ref.clear();
}
}
private static class BackgroundLeaseUpdate extends NodeStoreTask {
/** OAK-4859 : log if time between two renewClusterIdLease calls is too long **/
private long lastRenewClusterIdLeaseCall = -1;
/** elapsed time for previous update operation **/
private long elapsedForPreviousRenewal = -1;
private static int INTERVAL_MS = 1000;
private static int TOLERANCE_FOR_WARNING_MS = 2000;
BackgroundLeaseUpdate(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed, Suppliers.ofInstance(INTERVAL_MS));
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
// OAK-4859 : keep track of invocation time of renewClusterIdLease
// and warn if time since last call is longer than 5sec
Clock clock = nodeStore.getClock();
long now = clock.getTime();
if (lastRenewClusterIdLeaseCall >= 0) {
final long diff = now - lastRenewClusterIdLeaseCall;
if (diff > INTERVAL_MS + TOLERANCE_FOR_WARNING_MS) {
String renewTimeMessage = elapsedForPreviousRenewal <= 0 ? ""
: String.format(" (of which the last update operation took %dms)", elapsedForPreviousRenewal);
LOG.warn(
"BackgroundLeaseUpdate.execute: time since last renewClusterIdLease() call longer than expected: {}ms{} (expected ~{}ms)",
diff, renewTimeMessage, INTERVAL_MS);
}
}
lastRenewClusterIdLeaseCall = now;
nodeStore.renewClusterIdLease();
elapsedForPreviousRenewal = clock.getTime() - now;
}
}
private static class BackgroundClusterUpdate extends NodeStoreTask {
BackgroundClusterUpdate(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed, Suppliers.ofInstance(1000));
}
@Override
protected void execute(@NotNull DocumentNodeStore nodeStore) {
if (nodeStore.updateClusterState()) {
nodeStore.signalClusterStateChange();
}
}
}
public BlobStore getBlobStore() {
return blobStore;
}
BlobSerializer getBlobSerializer() {
return blobSerializer;
}
/**
* Returns an iterator for all the blob present in the store.
*
* <p>In some cases the iterator might implement {@link java.io.Closeable}. So
* callers should check for such iterator and close them.
*
* @return an iterator for all the blobs
*/
public Iterator<ReferencedBlob> getReferencedBlobsIterator() {
return referencedBlobs.iterator();
}
public DiffCache getDiffCache() {
return diffCache;
}
public Checkpoints getCheckpoints() {
return checkpoints;
}
@NotNull
public VersionGarbageCollector getVersionGarbageCollector() {
return versionGarbageCollector;
}
@NotNull
public JournalGarbageCollector getJournalGarbageCollector() {
return journalGarbageCollector;
}
@NotNull
public LastRevRecoveryAgent getLastRevRecoveryAgent() {
return lastRevRecoveryAgent;
}
@Override
public String getInstanceId() {
return String.valueOf(getClusterId());
}
@Override
public String getVisibilityToken() {
final DocumentNodeState theRoot = root;
if (theRoot == null) {
// unlikely but for paranoia reasons...
return "";
}
return theRoot.getRootRevision().asString();
}
private boolean isVisible(RevisionVector rv) {
// do not synchronize, take a local copy instead
final DocumentNodeState localRoot = root;
if (localRoot == null) {
// unlikely but for paranoia reasons...
return false;
}
return Utils.isGreaterOrEquals(localRoot.getRootRevision(), rv);
}
@Override
public boolean isVisible(@NotNull String visibilityToken, long maxWaitMillis) throws InterruptedException {
if (Strings.isNullOrEmpty(visibilityToken)) {
// we've asked for @Nonnull..
// hence throwing an exception
throw new IllegalArgumentException("visibilityToken must not be null or empty");
}
// 'fromString' would throw a RuntimeException if it can't parse
// that would be re thrown automatically
final RevisionVector visibilityTokenRv = RevisionVector.fromString(visibilityToken);
if (isVisible(visibilityTokenRv)) {
// the simple case
return true;
}
// otherwise wait until the visibility token's revisions all become visible
// (or maxWaitMillis has passed)
commitQueue.suspendUntilAll(Sets.newHashSet(visibilityTokenRv), maxWaitMillis);
// if we got interrupted above would throw InterruptedException
// otherwise, we don't know why suspendUntilAll returned, so
// check the final isVisible state and return it
return isVisible(visibilityTokenRv);
}
public DocumentNodeStoreStatsCollector getStatsCollector() {
return nodeStoreStatsCollector;
}
public DocumentNodeStateCache getNodeStateCache() {
return nodeStateCache;
}
public void setNodeStateCache(DocumentNodeStateCache nodeStateCache) {
this.nodeStateCache = nodeStateCache;
}
public JournalPropertyHandlerFactory getJournalPropertyHandlerFactory() {
return journalPropertyHandlerFactory;
}
int getUpdateLimit() {
return updateLimit;
}
boolean isReadOnlyMode() {
return readOnlyMode;
}
}