blob: f0e9cd56d294919258a2e224e5e0bde77248fbf4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.jackrabbit.oak.plugins.document;
import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.guava.common.collect.Iterables.filter;
import static org.apache.jackrabbit.guava.common.collect.Iterables.partition;
import static org.apache.jackrabbit.guava.common.collect.Iterables.transform;
import static org.apache.jackrabbit.guava.common.collect.Maps.immutableEntry;
import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.jackrabbit.guava.common.base.Function;
import org.apache.jackrabbit.guava.common.base.Predicate;
* The {@code MissingBcSweeper2} is used for the so-called sweep2, which is
* a repository traversal updating documents that have missing branch commit ("_bc")
* properties (details see OAK-9176).
* This class is similar to NodeDocumentSweeper as it is based on the same principles,
* with a few notable exceptions (like it only looks at _commitRoot and _revisions etc).
* And due to these exceptions the class is forked rather than modified/subclasses
* (also to enable later refactoring of the NodeDocumentSweeper itself).
* <p>
* This class is not thread-safe.
final class MissingBcSweeper2 {
private static final Logger LOG = LoggerFactory.getLogger(MissingBcSweeper2.class);
private static final int YIELD_SIZE = 500;
private static final int INVALIDATE_BATCH_SIZE = 100;
private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
private final RevisionContext context;
private final CommitValueResolver commitValueResolver;
private final int executingClusterId;
private final List<Integer> includedClusterIds;
private final RevisionVector headRevision;
private final AtomicBoolean isDisposed;
private long totalCount;
private long lastCount;
private long startOfScan;
private long lastLog;
* Creates a new sweeper v2 for the given context..
* @param context the revision context.
MissingBcSweeper2(RevisionContext context,
CommitValueResolver commitValueResolver,
List<Integer> includedClusterIds,
AtomicBoolean isDisposed) {
this.context = checkNotNull(context);
this.commitValueResolver = checkNotNull(commitValueResolver);
this.executingClusterId = context.getClusterId();
this.includedClusterIds = includedClusterIds == null ? new LinkedList<>() : Collections.unmodifiableList(includedClusterIds);
this.headRevision= context.getHeadRevision();
this.isDisposed = isDisposed;
* Performs a sweep2 and reports the required updates to the given sweep
* listener.
* @param documents the documents to sweep
* @param listener the listener to receive required sweep update operations.
* @throws DocumentStoreException if reading from the store or writing to
* the store failed.
void sweep2(@NotNull Iterable<NodeDocument> documents,
@NotNull NodeDocumentSweepListener listener)
throws DocumentStoreException {
performSweep2(documents, checkNotNull(listener));
//----------------------------< internal >----------------------------------
private void performSweep2(Iterable<NodeDocument> documents,
NodeDocumentSweepListener listener)
throws DocumentStoreException {
totalCount = 0;
lastCount = 0;
startOfScan = context.getClock().getTime();
lastLog = startOfScan;
Iterable<Map.Entry<Path, UpdateOp>> ops = sweepOperations(documents);
for (List<Map.Entry<Path, UpdateOp>> batch : partition(ops, INVALIDATE_BATCH_SIZE)) {
Map<Path, UpdateOp> updates = newHashMap();
for (Map.Entry<Path, UpdateOp> entry : batch) {
updates.put(entry.getKey(), entry.getValue());
if (isDisposed.get()) {
throw new DocumentStoreException("sweep2 interrupted by shutdown");
LOG.debug("Document sweep2 finished");
private Iterable<Map.Entry<Path, UpdateOp>> sweepOperations(
final Iterable<NodeDocument> docs) {
return filter(transform(docs,
new Function<NodeDocument, Map.Entry<Path, UpdateOp>>() {
int yieldCnt = 0;
long lastYield = context.getClock().getTime();
public Map.Entry<Path, UpdateOp> apply(NodeDocument doc) {
if (++yieldCnt >= YIELD_SIZE) {
try {
final long now = context.getClock().getTime();
final long timeSinceLastYield = now - lastYield;
// wait the same amount of time that passed since last yield
// that corresponds to roughly 50% throttle (ignoring the min 1ms sleep)
final long waitUntil = now + Math.max(1, timeSinceLastYield);
} catch (InterruptedException e) {
// ignore
lastYield = context.getClock().getTime();
yieldCnt = 0;
return immutableEntry(doc.getPath(), sweepOne(doc));
}), new Predicate<Map.Entry<Path, UpdateOp>>() {
public boolean apply(Map.Entry<Path, UpdateOp> input) {
return input.getValue() != null;
private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
UpdateOp op = null;
// the blueprint NodeDocumentSweeper.sweepOne goes through
// PROPERTY_OR_DELETED_OR_COMMITROOT here, as that's the new full sweep1.
// the sweep2 though, only goes through COMMITROOT_OR_REVISIONS,
// as that's what was left out by the original sweep1:
// - COMMITROOT : for new child (parent)
// - REVISIONS : for commit roots (root for branch commits)
for (String property : filter(doc.keySet(), COMMITROOT_OR_REVISIONS)) {
Map<Revision, String> valueMap = doc.getLocalMap(property);
for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
Revision rev = entry.getKey();
if (!includedClusterIds.isEmpty() && !includedClusterIds.contains(rev.getClusterId())) {
// sweep2 is only done for those clusterIds that went through sweep1 (before 1.8).
// includedClusterIds contains the latter list of clusterIds.
// For testing, the code also supports going through all clusterIds.
// (in this case the includedClusterIds list is empty).
// Note that sweep2 is never triggered if "_sweepRev" is empty,
// so the empty case here never applies for production.
Revision cRev = getCommitRevision(doc, rev);
if (cRev == null) {
// originally the uncommitted case
// however, sweep2 doesn't touch anything uncommitted
} else if (cRev.equals(rev)) {
// originally the committed case
// however, that was only logging and sweep2 is not about that,
// hence nothing to be done here
} else {
// this is what sweep2 is about : adding _bc for _commitRoot and _revisions
if (op == null) {
op = createUpdateOp(doc);
committedBranch(doc, property, rev, cRev, op);
long now = context.getClock().getTime();
long lastElapsed = now - lastLog;
if (lastElapsed >= LOGINTERVALMS) {
TimeDurationFormatter df = TimeDurationFormatter.forLogging();
long totalElapsed = now - startOfScan;
long totalRateMin = (totalCount * TimeUnit.MINUTES.toMillis(1)) / totalElapsed;
long lastRateMin = (lastCount * TimeUnit.MINUTES.toMillis(1)) / lastElapsed;
String restrictionMsg;
if (includedClusterIds.isEmpty()) {
restrictionMsg = "unrestricted, ie for all clusterIds";
} else {
restrictionMsg = "restricted to clusterIds " + includedClusterIds;
String message = String.format(
"Sweep2 executed by cluster node [%d] (%s): %d nodes scanned in %s (~%d/m) - last interval %d nodes in %s (~%d/m)",
executingClusterId, restrictionMsg, totalCount, df.format(totalElapsed, TimeUnit.MILLISECONDS), totalRateMin, lastCount,
df.format(lastElapsed, TimeUnit.MILLISECONDS), lastRateMin);;
lastLog = now;
lastCount = 0;
return op == null ? null : op.hasChanges() ? op : null;
* Returns {@code true} if the given revision is marked as a branch commit
* on the document. This method only checks local branch commit information
* available on the document ({@link NodeDocument#getLocalBranchCommits()}).
* If the given revision is related to a branch commit that was created
* prior to Oak 1.8, the method will return {@code false}.
* @param rev a revision.
* @param doc the document to check.
* @return {@code true} if the revision is marked as a branch commit;
* {@code false} otherwise.
private boolean isV18BranchCommit(Revision rev, NodeDocument doc) {
return doc.getLocalBranchCommits().contains(rev);
private void committedBranch(NodeDocument doc,
String property,
Revision rev,
Revision cRev,
UpdateOp op) {
boolean newerThanHead = headRevision.isRevisionNewer(cRev);
if (LOG.isDebugEnabled()) {
String msg = newerThanHead ? " (newer than head)" : "";
LOG.debug("Committed branch change on {}, {} @ {}/{}{}",
op.getId(), property, rev, cRev, msg);
if (!isV18BranchCommit(rev, doc)) {
NodeDocument.setBranchCommit(op, rev);
private static UpdateOp createUpdateOp(NodeDocument doc) {
return new UpdateOp(doc.getId(), false);
private String getCommitValue(@NotNull Revision changeRevision,
@NotNull NodeDocument doc) {
return commitValueResolver.resolve(changeRevision, doc);
private Revision getCommitRevision(final NodeDocument doc,
final Revision rev)
throws DocumentStoreException {
String cv = /*context.*/getCommitValue(rev, doc);
if (cv == null) {
return null;
return Utils.resolveCommitRevision(rev, cv);