| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.collect.Maps; |
| |
| import org.apache.jackrabbit.oak.stats.StatisticsProvider; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <code>CommitQueue</code> ensures a sequence of commits consistent with the |
| * commit revision even if commits did not complete in this sequence. |
| */ |
| final class CommitQueue { |
| |
| static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class); |
| |
| /** |
| * The default suspend timeout in milliseconds: 60'000. |
| */ |
| static final long DEFAULT_SUSPEND_TIMEOUT = TimeUnit.MINUTES.toMillis(1); |
| |
| private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, Entry>(StableRevisionComparator.INSTANCE); |
| |
| /** |
| * Map of currently suspended commits until a given Revision is visible. |
| */ |
| private final Map<Semaphore, SuspendedCommit> suspendedCommits = Maps.newIdentityHashMap(); |
| |
| private final RevisionContext context; |
| |
| /** |
| * The default stats collector is a noop. |
| */ |
| private DocumentNodeStoreStatsCollector statsCollector |
| = new DocumentNodeStoreStats(StatisticsProvider.NOOP); |
| |
| private long suspendTimeout = Long.getLong("oak.documentMK.suspendTimeoutMillis", DEFAULT_SUSPEND_TIMEOUT); |
| |
| CommitQueue(@NotNull RevisionContext context) { |
| this.context = checkNotNull(context); |
| } |
| |
| void setStatisticsCollector(@NotNull DocumentNodeStoreStatsCollector collector) { |
| statsCollector = checkNotNull(collector); |
| } |
| |
| @NotNull |
| Revision createRevision() { |
| return createRevisions(1).first(); |
| } |
| |
| @NotNull |
| SortedSet<Revision> createRevisions(int num) { |
| checkArgument(num > 0); |
| SortedSet<Revision> revs = new TreeSet<Revision>(StableRevisionComparator.INSTANCE); |
| Revision rev = null; |
| synchronized (this) { |
| for (int i = 0; i < num; i++) { |
| rev = context.newRevision(); |
| revs.add(rev); |
| } |
| commits.put(rev, new Entry(rev)); |
| } |
| LOG.debug("created commit {}", rev); |
| return revs; |
| } |
| |
| void done(@NotNull Revision revision, @NotNull Callback c) { |
| checkNotNull(revision); |
| waitUntilHeadOfQueue(revision, c); |
| } |
| |
| void canceled(@NotNull Revision rev) { |
| removeCommit(rev); |
| notifySuspendedCommits(rev); |
| } |
| |
| boolean contains(@NotNull Revision revision) { |
| synchronized (this) { |
| return commits.containsKey(checkNotNull(revision)); |
| } |
| } |
| |
| /** |
| * Suspends until for each of given revisions one of the following happens: |
| * <ul> |
| * <li>the given revision is visible from the current headRevision</li> |
| * <li>the given revision is canceled from the commit queue</li> |
| * <li>the suspend timeout is reached. See {@link #setSuspendTimeoutMillis(long)}</li> |
| * <li>the thread is interrupted</li> |
| * </ul> |
| * |
| * @param revisions the revisions to become visible. |
| */ |
| void suspendUntilAll(@NotNull Set<Revision> revisions) { |
| try { |
| suspendUntilAll(revisions, suspendTimeout); |
| } catch (InterruptedException e) { |
| LOG.debug("The suspended thread has been interrupted", e); |
| } |
| } |
| |
| /** |
| * Suspends until for each of given revisions one of the following happens: |
| * <ul> |
| * <li>the given revision is visible from the current headRevision</li> |
| * <li>the given revision is canceled from the commit queue</li> |
| * <li>the suspend timeout is reached</li> |
| * <li>the thread is interrupted</li> |
| * </ul> |
| * |
| * @param revisions the revisions to become visible. |
| * @param suspendTimeoutMillis how long to suspend at max |
| * @throws InterruptedException thrown when this thread has its interrupted |
| * status set or was interrupted while waiting. The current thread's |
| * interrupted status is cleared when this exception is thrown. |
| */ |
| void suspendUntilAll(@NotNull Set<Revision> revisions, long suspendTimeoutMillis) |
| throws InterruptedException { |
| Semaphore s; |
| int addedRevisions; |
| synchronized (suspendedCommits) { |
| RevisionVector headRevision = context.getHeadRevision(); |
| Set<Revision> afterHead = new HashSet<Revision>(revisions.size()); |
| for (Revision r : revisions) { |
| if (headRevision.isRevisionNewer(r)) { |
| afterHead.add(r); |
| } |
| } |
| |
| s = new Semaphore(0); |
| suspendedCommits.put(s, new SuspendedCommit(s, afterHead)); |
| addedRevisions = afterHead.size(); |
| } |
| try { |
| s.tryAcquire(addedRevisions, suspendTimeoutMillis, TimeUnit.MILLISECONDS); |
| } finally { |
| synchronized (suspendedCommits) { |
| suspendedCommits.remove(s); |
| } |
| } |
| } |
| |
| /** |
| * Called when the head revision accessible via the {@link RevisionContext} |
| * passed to constructor changed. |
| */ |
| void headRevisionChanged() { |
| notifySuspendedCommits(); |
| } |
| |
| /** |
| * @return the number of suspended threads on this commit queue. |
| */ |
| int numSuspendedThreads() { |
| synchronized (suspendedCommits) { |
| return suspendedCommits.size(); |
| } |
| } |
| |
| /** |
| * Sets the suspend timeout in milliseconds. |
| * See also {@link #suspendUntilAll(Set)}. |
| * |
| * @param timeout the timeout to set. |
| */ |
| void setSuspendTimeoutMillis(long timeout) { |
| this.suspendTimeout = timeout; |
| } |
| |
| interface Callback { |
| |
| void headOfQueue(@NotNull Revision revision); |
| } |
| |
| //------------------------< internal >-------------------------------------- |
| |
| private void notifySuspendedCommits() { |
| synchronized (suspendedCommits) { |
| if (suspendedCommits.isEmpty()) { |
| return; |
| } |
| RevisionVector headRevision = context.getHeadRevision(); |
| Iterator<SuspendedCommit> it = suspendedCommits.values().iterator(); |
| while (it.hasNext()) { |
| SuspendedCommit suspended = it.next(); |
| if (suspended.removeRevisionsVisibleFrom(headRevision) && suspended.revisions.isEmpty()) { |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| private void notifySuspendedCommits(@NotNull Revision revision) { |
| checkNotNull(revision); |
| synchronized (suspendedCommits) { |
| if (suspendedCommits.isEmpty()) { |
| return; |
| } |
| Iterator<SuspendedCommit> it = suspendedCommits.values().iterator(); |
| while (it.hasNext()) { |
| SuspendedCommit suspended = it.next(); |
| if (suspended.removeRevision(revision) && suspended.revisions.isEmpty()) { |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| private void removeCommit(@NotNull Revision rev) { |
| // simply remove and notify next head if any |
| synchronized (this) { |
| boolean wasHead = commits.firstKey().equals(rev); |
| commits.remove(rev); |
| LOG.debug("removed commit {}, wasHead={}", rev, wasHead); |
| if (wasHead) { |
| notifyHead(); |
| } |
| } |
| } |
| |
| private void waitUntilHeadOfQueue(@NotNull Revision rev, |
| @NotNull Callback c) { |
| assert !commits.isEmpty(); |
| |
| boolean isHead; |
| Entry commitEntry; |
| synchronized (this) { |
| isHead = commits.firstKey().equals(rev); |
| commitEntry = commits.get(rev); |
| } |
| if (isHead) { |
| statsCollector.doneWaitUntilHead(0); |
| } else { |
| LOG.debug("not head: {}, waiting...", rev); |
| statsCollector.doneWaitUntilHead(commitEntry.await()); |
| } |
| try { |
| c.headOfQueue(rev); |
| } finally { |
| synchronized (this) { |
| commits.remove(rev); |
| try { |
| LOG.debug("removed {}, head is now {}", rev, commits.isEmpty() ? null : commits.firstKey()); |
| } finally { |
| // notify next if there is any |
| notifyHead(); |
| } |
| } |
| } |
| } |
| |
| private void notifyHead() { |
| if (!commits.isEmpty()) { |
| LOG.debug("release {}", commits.firstKey()); |
| commits.get(commits.firstKey()).release(); |
| } |
| } |
| |
| /** |
| * An entry in the commit queue. |
| */ |
| private static final class Entry { |
| |
| /** |
| * The revision of the commit (used for debugging). |
| */ |
| private final Revision revision; |
| |
| /** |
| * The latch. Initially set to 1, so that release() needs to be called |
| * once for await() to continue. |
| */ |
| private final CountDownLatch latch = new CountDownLatch(1); |
| |
| Entry(Revision revision) { |
| this.revision = revision; |
| } |
| |
| /** |
| * Release all threads that are waiting. |
| */ |
| void release() { |
| latch.countDown(); |
| } |
| |
| /** |
| * Wait for the latch to be released. |
| * |
| * @return the number of microseconds this method waited. |
| */ |
| long await() { |
| long start = System.nanoTime(); |
| for (;;) { |
| try { |
| LOG.debug("awaiting {}", revision); |
| latch.await(); |
| break; |
| } catch (InterruptedException e) { |
| // retry |
| } |
| } |
| return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start); |
| } |
| } |
| |
| private class SuspendedCommit { |
| |
| private final Semaphore semaphore; |
| |
| private final Set<Revision> revisions; |
| |
| private SuspendedCommit(Semaphore semaphore, Set<Revision> revisions) { |
| this.semaphore = semaphore; |
| this.revisions = revisions; |
| } |
| |
| private boolean removeRevisionsVisibleFrom(RevisionVector revision) { |
| Iterator<Revision> it = revisions.iterator(); |
| boolean removed = false; |
| while (it.hasNext()) { |
| if (!revision.isRevisionNewer(it.next())) { |
| it.remove(); |
| semaphore.release(); |
| removed = true; |
| } |
| } |
| return removed; |
| } |
| |
| private boolean removeRevision(Revision r) { |
| if (revisions.remove(r)) { |
| semaphore.release(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| } |
| } |