blob: 067fa354a8b90d70fcde35134b746d7a4ceb69cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
import static org.apache.jackrabbit.oak.api.CommitFailedException.STATE;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
import static org.apache.jackrabbit.oak.plugins.document.util.CountingDiff.countChanges;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of a DocumentMK based node store branch.
*/
class DocumentNodeStoreBranch implements NodeStoreBranch {
private static final Logger LOG = LoggerFactory.getLogger(DocumentNodeStoreBranch.class);
private static final PerfLogger perfLogger = new PerfLogger(
LoggerFactory.getLogger(DocumentNodeStoreBranch.class.getName()
+ ".perf"));
private static final int MAX_LOCK_TRY_TIME_MULTIPLIER = Integer.getInteger("oak.maxLockTryTimeMultiplier", 30);
private static final Random RANDOM = new Random();
private static final long MIN_BACKOFF = 50;
/** The underlying store to which this branch belongs */
protected final DocumentNodeStore store;
protected final long maximumBackoff;
/** The maximum time in milliseconds to wait for the merge lock. */
protected final long maxLockTryTimeMS;
/** Lock for coordinating concurrent merge operations */
private final ReadWriteLock mergeLock;
/** The maximum number of updates to keep in memory */
private final int updateLimit;
/**
* State of the this branch. Either {@link Unmodified}, {@link InMemory}, {@link Persisted},
* {@link ResetFailed} or {@link Merged}.
* @see BranchState
*/
private BranchState branchState;
DocumentNodeStoreBranch(DocumentNodeStore store,
DocumentNodeState base,
ReadWriteLock mergeLock) {
this.store = checkNotNull(store);
this.branchState = new Unmodified(checkNotNull(base));
this.maximumBackoff = Math.max((long) store.getMaxBackOffMillis(), MIN_BACKOFF);
this.maxLockTryTimeMS = (long) (store.getMaxBackOffMillis() * MAX_LOCK_TRY_TIME_MULTIPLIER);
this.mergeLock = mergeLock;
this.updateLimit = store.getUpdateLimit();
}
@NotNull
@Override
public NodeState getBase() {
return branchState.getBase();
}
@NotNull
@Override
public NodeState getHead() {
return branchState.getHead();
}
@Override
public void setRoot(NodeState newRoot) {
branchState.setRoot(checkNotNull(newRoot));
}
@NotNull
@Override
public NodeState merge(@NotNull CommitHook hook, @NotNull CommitInfo info)
throws CommitFailedException {
try {
return merge0(hook, info, false);
} catch (CommitFailedException e) {
if (!e.isOfType(MERGE)) {
throw e;
}
}
// retry with exclusive lock, blocking other
// concurrent writes
return merge0(hook, info, true);
}
@Override
public void rebase() {
branchState.rebase();
}
@Override
public String toString() {
return branchState.toString();
}
//------------------------------< internal >--------------------------------
/**
* For test purposes only!
*/
@NotNull
ReadWriteLock getMergeLock() {
return mergeLock;
}
/**
* For test purposes only!
* <p>
* Forces the branch to persist the changes to the underlying store.
*/
void persist() {
branchState.persist();
}
@NotNull
private NodeState merge0(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive)
throws CommitFailedException {
CommitFailedException ex = null;
Set<Revision> conflictRevisions = new HashSet<Revision>();
long time = System.currentTimeMillis();
int numRetries = 0;
long suspendMillis = 0;
for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) {
if (ex != null) {
try {
numRetries++;
final long start = perfLogger.start();
// suspend until conflict revision is visible
// or as a fallback sleep for a while
if (!conflictRevisions.isEmpty()) {
// suspend until conflicting revision is visible
LOG.debug("Suspending until {} is visible. Current head {}.",
conflictRevisions, store.getHeadRevision());
suspendMillis += store.suspendUntilAll(conflictRevisions);
conflictRevisions.clear();
LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
} else {
long sleepMillis = backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE));
suspendMillis += sleepMillis;
Thread.sleep(sleepMillis);
}
perfLogger.end(start, 1, "Merge - Retry attempt [{}]", numRetries);
} catch (InterruptedException e) {
throw new CommitFailedException(
MERGE, 3, "Merge interrupted", e);
}
}
try {
NodeState result = branchState.merge(checkNotNull(hook),
checkNotNull(info), exclusive);
store.getStatsCollector().doneMerge(branchState.getMergedChanges(),
numRetries, System.currentTimeMillis() - time, suspendMillis, exclusive);
return result;
} catch (FailedWithConflictException e) {
ex = e;
conflictRevisions.addAll(e.getConflictRevisions());
} catch (CommitFailedException e) {
ex = e;
}
LOG.trace("Merge Error", ex);
// only retry on merge failures. these may be caused by
// changes introduce by a commit hook and may be resolved
// by a rebase and running the hook again
if (!ex.isOfType(MERGE)) {
throw ex;
}
}
// if we get here retrying failed
time = System.currentTimeMillis() - time;
store.getStatsCollector().failedMerge(numRetries, time, suspendMillis, exclusive);
String msg = ex.getMessage() + " (retries " + numRetries + ", " + time + " ms)";
throw new CommitFailedException(ex.getSource(), ex.getType(),
ex.getCode(), msg, ex.getCause());
}
/**
* Acquires the merge lock either exclusive or shared.
*
* @param exclusive whether to acquire the merge lock exclusive.
* @return the acquired merge lock or {@code null} if the operation timed
* out.
* @throws CommitFailedException if the current thread is interrupted while
* acquiring the lock
*/
@Nullable
private Lock acquireMergeLock(boolean exclusive)
throws CommitFailedException {
final long start = System.nanoTime();
Lock lock;
if (exclusive) {
lock = mergeLock.writeLock();
} else {
lock = mergeLock.readLock();
}
boolean acquired;
try {
acquired = lock.tryLock(maxLockTryTimeMS, MILLISECONDS);
} catch (InterruptedException e) {
throw new CommitFailedException(OAK, 1,
"Unable to acquire merge lock", e);
}
String mode = exclusive ? "exclusive" : "shared";
if (acquired) {
perfLogger.end(start, 1, "Merge - Acquired lock ({})", mode);
} else {
LOG.info("Time out while acquiring merge lock ({})", mode);
lock = null;
}
long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start);
store.getStatsCollector().doneMergeLockAcquired(micros);
return lock;
}
/**
* Persists the changes between {@code toPersist} and {@code base}
* to the underlying store.
*
* @param toPersist the state with the changes on top of {@code base}.
* @param base the base state.
* @param info the commit info.
* @param stats the merge stats.
* @return the state with the persisted changes.
* @throws ConflictException if changes cannot be persisted because a
* conflict occurred. The exception may contain the revisions of
* the conflicting operations.
* @throws DocumentStoreException if the persist operation failed because
* of an exception in the underlying {@link DocumentStore}.
*/
private DocumentNodeState persist(final @NotNull NodeState toPersist,
final @NotNull DocumentNodeState base,
final @NotNull CommitInfo info,
final @NotNull MergeStats stats)
throws ConflictException, DocumentStoreException {
return persist(new Changes() {
@Override
public void with(@NotNull CommitBuilder commitBuilder) {
CommitDiff diff = new CommitDiff(
store.getBundlingConfigHandler().newBundlingHandler(),
commitBuilder, store.getBlobSerializer());
toPersist.compareAgainstBaseState(base, diff);
stats.numDocuments += diff.getNumChanges();
}
}, base, info);
}
/**
* Persist some changes on top of the given base state.
*
* @param op the changes to persist.
* @param base the base state.
* @param info the commit info.
* @return the result state.
* @throws ConflictException if changes cannot be persisted because a
* conflict occurred. The exception may contain the revisions of
* the conflicting operations.
* @throws DocumentStoreException if the persist operation failed because
* of an exception in the underlying {@link DocumentStore}.
*/
private DocumentNodeState persist(@NotNull Changes op,
@NotNull DocumentNodeState base,
@NotNull CommitInfo info)
throws ConflictException, DocumentStoreException {
boolean success = false;
Commit c = store.newCommit(op, base.getRootRevision(), this);
RevisionVector rev;
try {
if (c.isEmpty()) {
// no changes to persist. return base state and let
// finally clause cancel the commit
return base;
}
c.apply();
rev = store.done(c, base.getRootRevision().isBranch(), info);
success = true;
} finally {
if (!success) {
store.canceled(c);
}
}
return store.getRoot(rev);
}
private static CommitFailedException mergeFailed(Throwable t) {
String msg = t.getMessage();
if (msg == null) {
msg = "Failed to merge changes to the underlying store";
}
String type = OAK;
if (t instanceof DocumentStoreException) {
DocumentStoreException dse = (DocumentStoreException) t;
if (dse.getType() == DocumentStoreException.Type.TRANSIENT) {
// set type to MERGE, which indicates a retry may work
type = MERGE;
}
}
return new CommitFailedException(type, 1, msg, t);
}
/**
* Sub classes of this class represent a state a branch can be in. See the individual
* sub classes for permissible state transitions.
*/
private abstract class BranchState {
/** Root state of the base revision of this branch */
protected DocumentNodeState base;
protected BranchState(DocumentNodeState base) {
this.base = base;
}
/**
* Persist this branch to an underlying branch in the {@code NodeStore}.
*/
Persisted persist() {
Persisted p = new Persisted(base);
p.persistTransientHead(getHead());
branchState = p;
return p;
}
DocumentNodeState getBase(){
return base;
}
int getMergedChanges() {
return 0;
}
@NotNull
abstract NodeState getHead();
abstract void setRoot(NodeState root);
abstract void rebase();
/**
* Runs the commit hook on the changes tracked with this branch state
* merges the result.
* <p>
* In addition to the {@link CommitFailedException}, an implementation
* may also throw an unchecked exception when an error occurs while
* persisting the changes. This exception is implementation specific
* and it is the responsibility of the caller to convert it into a
* {@link CommitFailedException}.
*
* @param hook the commit hook to run.
* @param info the associated commit info.
* @param exclusive whether the merge lock must be acquired exclusively
* or shared while performing the merge.
* @return the result of the merge.
* @throws CommitFailedException if a commit hook rejected the changes
* or the actual merge operation failed. An implementation must
* use the appropriate type in {@code CommitFailedException} to
* indicate the cause of the exception.
*/
@NotNull
abstract NodeState merge(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive)
throws CommitFailedException;
}
/**
* Instances of this class represent a branch whose base and head are the same.
* <p>
* Transitions to:
* <ul>
* <li>{@link InMemory} on {@link #setRoot(NodeState)} if the new root differs
* from the current base</li>.
* <li>{@link Merged} on {@link BranchState#merge(CommitHook, CommitInfo, boolean)}</li>
* </ul>
*/
private class Unmodified extends BranchState {
Unmodified(DocumentNodeState base) {
super(base);
}
@Override
public String toString() {
return "Unmodified[" + base + ']';
}
@Override
@NotNull
NodeState getHead() {
return base;
}
@Override
void setRoot(NodeState root) {
if (!base.equals(root)) {
branchState = new InMemory(base, root);
}
}
@Override
void rebase() {
base = store.getRoot();
}
@Override
@NotNull
NodeState merge(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive) {
branchState = new Merged(base, new MergeStats());
return base;
}
}
/**
* Instances of this class represent a branch whose base and head differ.
* All changes are kept in memory.
* <p>
* Transitions to:
* <ul>
* <li>{@link Unmodified} on {@link #setRoot(NodeState)} if the new root is the same
* as the base of this branch</li>
* <li>{@link Persisted} on {@link #setRoot(NodeState)} if the number of
* changes counted from the base to the new root reaches
* {@link DocumentNodeStoreBuilder#getUpdateLimit()}.</li>
* <li>{@link Merged} on {@link BranchState#merge(CommitHook, CommitInfo, boolean)}</li>
* </ul>
*/
private class InMemory extends BranchState {
/** Root state of the transient head. */
private NodeState head;
@Override
public String toString() {
return "InMemory[" + base + ", " + head + ']';
}
InMemory(DocumentNodeState base, NodeState head) {
super(base);
this.head = newModifiedDocumentNodeState(head);
}
@Override
@NotNull
NodeState getHead() {
return head;
}
@Override
void setRoot(NodeState root) {
if (base.equals(root)) {
branchState = new Unmodified(base);
} else {
int numChanges = countChanges(base, root);
if (numChanges > updateLimit) {
head = newModifiedDocumentNodeState(root);
persist();
} else {
NodeBuilder builder = new MemoryNodeBuilder(base);
root.compareAgainstBaseState(base, new ApplyDiff(builder));
head = newModifiedDocumentNodeState(builder.getNodeState());
}
}
}
@Override
void rebase() {
DocumentNodeState root = store.getRoot();
// is a rebase necessary?
if (root.getRootRevision().equals(base.getRootRevision())) {
// fresh root is still at the same revision as
// the base of the branch
return;
}
NodeBuilder builder = new MemoryNodeBuilder(root);
head.compareAgainstBaseState(base, new ConflictAnnotatingRebaseDiff(builder));
base = root;
head = newModifiedDocumentNodeState(builder.getNodeState());
}
@Override
@NotNull
NodeState merge(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive)
throws CommitFailedException {
checkNotNull(hook);
checkNotNull(info);
Lock lock = acquireMergeLock(exclusive);
try {
rebase();
boolean success = false;
NodeState previousHead = head;
try {
DocumentNodeStoreStatsCollector stats = store.getStatsCollector();
NodeState toCommit = TimingHook.wrap(hook, (time, unit) -> stats.doneCommitHookProcessed(unit.toMicros(time)))
.processCommit(base, head, info);
try {
MergeStats ms = new MergeStats();
NodeState newHead;
if (this != branchState) {
// branch state is not in-memory anymore
Persisted p = branchState.persist();
RevisionVector branchRev = p.getHead().getRootRevision();
newHead = store.getRoot(store.merge(branchRev, info));
stats.doneMergeBranch(p.numCommits, p.getMergedChanges());
} else {
newHead = DocumentNodeStoreBranch.this.persist(toCommit, base, info, ms);
}
branchState = new Merged(base, ms);
success = true;
return newHead;
} catch (ConflictException e) {
throw e.asCommitFailedException();
} catch (Throwable t) {
throw mergeFailed(t);
}
} finally {
if (!success) {
this.head = previousHead;
if (this != branchState) {
// the branch state transitioned to persisted while
// processing the commit hook and then failed.
// remember the persisted branch state
BranchState currentState = branchState;
// reset branch state back to in-memory
branchState = this;
// reset the entire persisted branch state
reset(currentState.persist());
}
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* Reset the entire persisted branch.
*
* @param p the persisted branch.
*/
private void reset(Persisted p) {
RevisionVector branchHeadRev = p.getHead().getRootRevision();
// get the branch that belongs to this persisted branch state
Branch b = store.getBranches().getBranch(branchHeadRev);
if (b != null) {
try {
store.reset(branchHeadRev,
b.getBase().asBranchRevision(store.getClusterId()));
} catch (Exception e) {
LOG.warn("Resetting persisted branch failed", e);
}
}
}
private ModifiedDocumentNodeState newModifiedDocumentNodeState(NodeState modified) {
return new ModifiedDocumentNodeState(store, DocumentNodeStoreBranch.this, base, modified);
}
}
/**
* Instances of this class represent a branch whose head is persisted to an
* underlying branch in the {@code NodeStore}.
* <p>
* Transitions to:
* <ul>
* <li>{@link ResetFailed} on failed reset in {@link BranchState#merge(CommitHook, CommitInfo, boolean)}</li>
* <li>{@link Merged} on successful {@link BranchState#merge(CommitHook, CommitInfo, boolean)}</li>
* </ul>
*/
private class Persisted extends BranchState {
/** Root state of the transient head, top of persisted branch. */
private DocumentNodeState head;
/**
* Number of commits on this persisted branch.
*/
private int numCommits;
private final MergeStats ms = new MergeStats();
@Override
public String toString() {
return "Persisted[" + base + ", " + head + ']';
}
Persisted(DocumentNodeState base) {
super(base);
this.head = createBranch(base).asBranchRootState(DocumentNodeStoreBranch.this);
}
@Override
Persisted persist() {
// nothing to do, this branch state is already persisted
return this;
}
/**
* Create a new branch state from the given state.
*
* @param state the state from where to create a branch from.
* @return the branch state.
*/
final DocumentNodeState createBranch(DocumentNodeState state) {
return store.getRoot(state.getRootRevision().asBranchRevision(store.getClusterId()));
}
@Override
@NotNull
DocumentNodeState getHead() {
return head;
}
@Override
void setRoot(NodeState root) {
if (!head.equals(root)) {
persistTransientHead(root);
}
}
@Override
void rebase() {
DocumentNodeState root = store.getRoot();
// perform rebase in store
head = store.getRoot(store.rebase(head.getRootRevision(), root.getRootRevision()))
.asBranchRootState(DocumentNodeStoreBranch.this);
base = root;
}
@Override
@NotNull
NodeState merge(@NotNull final CommitHook hook,
@NotNull final CommitInfo info,
boolean exclusive)
throws CommitFailedException {
boolean success = false;
DocumentNodeState previousHead = head;
Lock lock = acquireMergeLock(exclusive);
try {
rebase();
previousHead = head;
checkForConflicts();
DocumentNodeStoreStatsCollector stats = store.getStatsCollector();
NodeState toCommit = TimingHook.wrap(checkNotNull(hook), (time, unit) -> stats.doneCommitHookProcessed(unit.toMicros(time)))
.processCommit(base, head, info);
persistTransientHead(toCommit);
DocumentNodeState newRoot = store.getRoot(store.merge(head.getRootRevision(), info));
success = true;
branchState = new Merged(base, ms);
stats.doneMergeBranch(numCommits, branchState.getMergedChanges());
return newRoot;
} catch (CommitFailedException e) {
throw e;
} catch (ConflictException e) {
throw e.asCommitFailedException();
} catch (Throwable t) {
throw mergeFailed(t);
} finally {
if (lock != null) {
lock.unlock();
}
if (!success) {
resetBranch(head, previousHead);
}
}
}
private void persistTransientHead(NodeState newHead)
throws DocumentStoreException {
try {
head = DocumentNodeStoreBranch.this.persist(newHead, head, CommitInfo.EMPTY, ms)
.asBranchRootState(DocumentNodeStoreBranch.this);
} catch (ConflictException e) {
throw DocumentStoreException.convert(e);
}
numCommits++;
store.getStatsCollector().doneBranchCommit();
}
private void resetBranch(DocumentNodeState branchHead, DocumentNodeState ancestor) {
try {
head = store.getRoot(store.reset(branchHead.getRootRevision(),
ancestor.getRootRevision()))
.asBranchRootState(DocumentNodeStoreBranch.this);
} catch (Exception e) {
CommitFailedException ex = new CommitFailedException(
OAK, 100, "Branch reset failed", e);
branchState = new ResetFailed(base, ex);
}
}
/**
* Checks if any of the commits on this branch have a collision marker
* set.
*
* @throws CommitFailedException if a collision marker is set for one
* of the commits on this branch.
*/
private void checkForConflicts() throws CommitFailedException {
Branch b = store.getBranches().getBranch(head.getRootRevision());
if (b == null) {
return;
}
NodeDocument doc = Utils.getRootDocument(store.getDocumentStore());
Set<Revision> collisions = Sets.newHashSet(doc.getLocalMap(COLLISIONS).keySet());
Set<Revision> commits = Sets.newHashSet(Iterables.transform(b.getCommits(),
new Function<Revision, Revision>() {
@Override
public Revision apply(Revision input) {
return input.asTrunkRevision();
}
}));
Set<Revision> conflicts = Sets.intersection(collisions, commits);
if (!conflicts.isEmpty()) {
throw new CommitFailedException(STATE, 2,
"Conflicting concurrent change on branch commits " + conflicts);
}
}
}
/**
* Instances of this class represent a branch that has already been merged.
* All methods throw an {@code IllegalStateException}.
* <p>
* Transitions to: none.
*/
private class Merged extends BranchState {
private final MergeStats stats;
protected Merged(@NotNull DocumentNodeState base,
@NotNull MergeStats stats) {
super(base);
this.stats = stats;
}
@Override
public String toString() {
return "Merged[" + base + ']';
}
@Override
@NotNull
NodeState getHead() {
throw new IllegalStateException("Branch has already been merged");
}
@Override
void setRoot(NodeState root) {
throw new IllegalStateException("Branch has already been merged");
}
@Override
void rebase() {
throw new IllegalStateException("Branch has already been merged");
}
@Override
@NotNull
NodeState merge(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive) {
throw new IllegalStateException("Branch has already been merged");
}
@Override
int getMergedChanges() {
return stats.numDocuments;
}
}
/**
* Instances of this class represent a branch with persisted changes and
* a failed attempt to reset changes.
* <p>
* Transitions to: none.
*/
private class ResetFailed extends BranchState {
/**
* The exception of the failed reset.
*/
private final CommitFailedException ex;
protected ResetFailed(DocumentNodeState base, CommitFailedException e) {
super(base);
this.ex = e;
}
@NotNull
@Override
NodeState getHead() {
throw new IllegalStateException("Branch with failed reset", ex);
}
@Override
void setRoot(NodeState root) {
throw new IllegalStateException("Branch with failed reset", ex);
}
@Override
void rebase() {
throw new IllegalStateException("Branch with failed reset", ex);
}
/**
* Always throws the {@code CommitFailedException} passed to the
* constructor of this branch state.
*
* @throws CommitFailedException the exception of the failed reset.
*/
@NotNull
@Override
NodeState merge(@NotNull CommitHook hook,
@NotNull CommitInfo info,
boolean exclusive)
throws CommitFailedException {
throw ex;
}
}
}