| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.plugins.document; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import org.apache.jackrabbit.oak.commons.sort.StringSort; |
| import org.apache.jackrabbit.oak.plugins.document.util.TimeInterval; |
| import org.apache.jackrabbit.oak.plugins.document.util.Utils; |
| import org.apache.jackrabbit.oak.spi.gc.DelegatingGCMonitor; |
| import org.apache.jackrabbit.oak.spi.gc.GCMonitor; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.apache.jackrabbit.oak.commons.TimeDurationFormatter; |
| import org.apache.jackrabbit.oak.stats.StatisticsProvider; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR; |
| import static com.google.common.collect.Iterables.all; |
| import static com.google.common.collect.Iterators.partition; |
| import static com.google.common.util.concurrent.Atomics.newReference; |
| import static java.util.concurrent.TimeUnit.MICROSECONDS; |
| import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; |
| import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType.COMMIT_ROOT_ONLY; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType.DEFAULT_LEAF; |
| import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType.DEFAULT_NO_BRANCH; |
| import static org.slf4j.helpers.MessageFormatter.arrayFormat; |
| |
| public class VersionGarbageCollector { |
| |
| //Kept less than MongoDocumentStore.IN_CLAUSE_BATCH_SIZE to avoid re-partitioning |
| private static final int DELETE_BATCH_SIZE = 450; |
| private static final int UPDATE_BATCH_SIZE = 450; |
| private static final int PROGRESS_BATCH_SIZE = 10000; |
| private static final String STATUS_IDLE = "IDLE"; |
| private static final String STATUS_INITIALIZING = "INITIALIZING"; |
| private static final Logger log = LoggerFactory.getLogger(VersionGarbageCollector.class); |
| |
| /** |
| * Split document types which can be safely garbage collected |
| */ |
| private static final Set<NodeDocument.SplitDocType> GC_TYPES = EnumSet.of( |
| DEFAULT_LEAF, COMMIT_ROOT_ONLY, DEFAULT_NO_BRANCH); |
| |
| /** |
| * Document id stored in settings collection that keeps info about version gc |
| */ |
| static final String SETTINGS_COLLECTION_ID = "versionGC"; |
| |
| /** |
| * Property name to timestamp when last gc run happened |
| */ |
| static final String SETTINGS_COLLECTION_OLDEST_TIMESTAMP_PROP = "lastOldestTimeStamp"; |
| |
| /** |
| * Property name to recommended time interval for next collection run |
| */ |
| static final String SETTINGS_COLLECTION_REC_INTERVAL_PROP = "recommendedIntervalMs"; |
| |
| private final DocumentNodeStore nodeStore; |
| private final DocumentStore ds; |
| private final VersionGCSupport versionStore; |
| private final AtomicReference<GCJob> collector = newReference(); |
| private VersionGCOptions options; |
| private GCMonitor gcMonitor = GCMonitor.EMPTY; |
| private RevisionGCStats gcStats = new RevisionGCStats(StatisticsProvider.NOOP); |
| |
| VersionGarbageCollector(DocumentNodeStore nodeStore, |
| VersionGCSupport gcSupport) { |
| this.nodeStore = nodeStore; |
| this.versionStore = gcSupport; |
| this.ds = gcSupport.getDocumentStore(); |
| this.options = new VersionGCOptions(); |
| } |
| |
| void setStatisticsProvider(StatisticsProvider provider) { |
| this.gcStats = new RevisionGCStats(provider); |
| } |
| |
| @NotNull |
| RevisionGCStats getRevisionGCStats() { |
| return gcStats; |
| } |
| |
| public VersionGCStats gc(long maxRevisionAge, TimeUnit unit) throws IOException { |
| long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge); |
| TimeInterval maxRunTime = new TimeInterval(nodeStore.getClock().getTime(), Long.MAX_VALUE); |
| if (options.maxDurationMs > 0) { |
| maxRunTime = maxRunTime.startAndDuration(options.maxDurationMs); |
| } |
| GCJob job = new GCJob(maxRevisionAgeInMillis, options, gcMonitor); |
| if (collector.compareAndSet(null, job)) { |
| VersionGCStats overall = new VersionGCStats(); |
| overall.active.start(); |
| gcStats.started(); |
| boolean success = false; |
| try { |
| long averageDurationMs = 0; |
| while (maxRunTime.contains(nodeStore.getClock().getTime() + averageDurationMs)) { |
| gcMonitor.info("Start {}. run (avg duration {} sec)", |
| overall.iterationCount + 1, averageDurationMs / 1000.0); |
| VersionGCStats stats = job.run(); |
| |
| overall.addRun(stats); |
| if (options.maxIterations > 0 && overall.iterationCount >= options.maxIterations) { |
| break; |
| } |
| if (!overall.needRepeat) { |
| break; |
| } |
| averageDurationMs = ((averageDurationMs * (overall.iterationCount - 1)) |
| + stats.active.elapsed(TimeUnit.MILLISECONDS)) / overall.iterationCount; |
| } |
| success = true; |
| return overall; |
| } finally { |
| overall.active.stop(); |
| collector.set(null); |
| overall.success = success; |
| gcStats.finished(overall); |
| if (overall.iterationCount > 1) { |
| gcMonitor.info("Revision garbage collection finished after {} iterations - aggregate statistics: {}", |
| overall.iterationCount, overall); |
| } |
| } |
| } else { |
| throw new IOException("Revision garbage collection is already running"); |
| } |
| } |
| |
| public void cancel() { |
| GCJob job = collector.get(); |
| if (job != null) { |
| job.cancel(); |
| } |
| } |
| |
| public String getStatus() { |
| GCJob job = collector.get(); |
| if (job == null) { |
| return STATUS_IDLE; |
| } else { |
| return job.getStatus(); |
| } |
| } |
| |
| public void setGCMonitor(@NotNull GCMonitor gcMonitor) { |
| this.gcMonitor = checkNotNull(gcMonitor); |
| } |
| |
| public VersionGCOptions getOptions() { |
| return this.options; |
| } |
| |
| public void setOptions(VersionGCOptions options) { |
| this.options = options; |
| } |
| |
| public void reset() { |
| ds.remove(SETTINGS, SETTINGS_COLLECTION_ID); |
| } |
| |
| public VersionGCInfo getInfo(long maxRevisionAge, TimeUnit unit) |
| throws IOException { |
| long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge); |
| long now = nodeStore.getClock().getTime(); |
| VersionGCRecommendations rec = new VersionGCRecommendations(maxRevisionAgeInMillis, nodeStore.getCheckpoints(), |
| nodeStore.getClock(), versionStore, options, gcMonitor); |
| int estimatedIterations = -1; |
| if (rec.suggestedIntervalMs > 0) { |
| estimatedIterations = (int)Math.ceil( |
| (now - rec.scope.toMs) / rec.suggestedIntervalMs); |
| } |
| return new VersionGCInfo(rec.lastOldestTimestamp, rec.scope.fromMs, |
| rec.deleteCandidateCount, rec.maxCollect, |
| rec.suggestedIntervalMs, rec.scope.toMs, estimatedIterations); |
| } |
| |
| public static class VersionGCInfo { |
| public final long lastSuccess; |
| public final long oldestRevisionEstimate; |
| public final long revisionsCandidateCount; |
| public final long collectLimit; |
| public final long recommendedCleanupInterval; |
| public final long recommendedCleanupTimestamp; |
| public final int estimatedIterations; |
| |
| VersionGCInfo(long lastSuccess, |
| long oldestRevisionEstimate, |
| long revisionsCandidateCount, |
| long collectLimit, |
| long recommendedCleanupInterval, |
| long recommendedCleanupTimestamp, |
| int estimatedIterations) { |
| this.lastSuccess = lastSuccess; |
| this.oldestRevisionEstimate = oldestRevisionEstimate; |
| this.revisionsCandidateCount = revisionsCandidateCount; |
| this.collectLimit = collectLimit; |
| this.recommendedCleanupInterval = recommendedCleanupInterval; |
| this.recommendedCleanupTimestamp = recommendedCleanupTimestamp; |
| this.estimatedIterations = estimatedIterations; |
| } |
| } |
| |
| public static class VersionGCStats { |
| boolean ignoredGCDueToCheckPoint; |
| boolean canceled; |
| boolean success = true; |
| boolean limitExceeded; |
| boolean needRepeat; |
| int iterationCount; |
| int deletedDocGCCount; |
| int deletedLeafDocGCCount; |
| int splitDocGCCount; |
| int intermediateSplitDocGCCount; |
| int updateResurrectedGCCount; |
| final TimeDurationFormatter df = TimeDurationFormatter.forLogging(); |
| final Stopwatch active = Stopwatch.createUnstarted(); |
| final Stopwatch collectDeletedDocs = Stopwatch.createUnstarted(); |
| final Stopwatch checkDeletedDocs = Stopwatch.createUnstarted(); |
| final Stopwatch deleteDeletedDocs = Stopwatch.createUnstarted(); |
| final Stopwatch collectAndDeleteSplitDocs = Stopwatch.createUnstarted(); |
| final Stopwatch deleteSplitDocs = Stopwatch.createUnstarted(); |
| final Stopwatch sortDocIds = Stopwatch.createUnstarted(); |
| final Stopwatch updateResurrectedDocuments = Stopwatch.createUnstarted(); |
| long activeElapsed, collectDeletedDocsElapsed, checkDeletedDocsElapsed, deleteDeletedDocsElapsed, collectAndDeleteSplitDocsElapsed, |
| deleteSplitDocsElapsed, sortDocIdsElapsed, updateResurrectedDocumentsElapsed; |
| |
| @Override |
| public String toString() { |
| String timings; |
| String fmt = "timeToCollectDeletedDocs=%s, timeToCheckDeletedDocs=%s, timeToSortDocIds=%s, timeTakenToUpdateResurrectedDocs=%s, timeTakenToDeleteDeletedDocs=%s, timeTakenToCollectAndDeleteSplitDocs=%s%s"; |
| |
| // aggregated timings? |
| if (iterationCount > 0) { |
| String timeDeletingSplitDocs = ""; |
| if (deleteSplitDocsElapsed > 0) { |
| timeDeletingSplitDocs = String.format(" (of which %s deleting)", |
| df.format(deleteSplitDocsElapsed, MICROSECONDS)); |
| } |
| timings = String.format(fmt, df.format(collectDeletedDocsElapsed, MICROSECONDS), |
| df.format(checkDeletedDocsElapsed, MICROSECONDS), df.format(sortDocIdsElapsed, MICROSECONDS), |
| df.format(updateResurrectedDocumentsElapsed, MICROSECONDS), |
| df.format(deleteDeletedDocsElapsed, MICROSECONDS), |
| df.format(collectAndDeleteSplitDocsElapsed, MICROSECONDS), |
| timeDeletingSplitDocs); |
| } else { |
| String timeDeletingSplitDocs = ""; |
| if (deleteSplitDocs.elapsed(MICROSECONDS) > 0) { |
| timeDeletingSplitDocs = String.format(" (of which %s deleting)", |
| df.format(deleteSplitDocs.elapsed(MICROSECONDS), MICROSECONDS)); |
| } |
| timings = String.format(fmt, df.format(collectDeletedDocs.elapsed(MICROSECONDS), MICROSECONDS), |
| df.format(checkDeletedDocs.elapsed(MICROSECONDS), MICROSECONDS), |
| df.format(sortDocIds.elapsed(MICROSECONDS), MICROSECONDS), |
| df.format(updateResurrectedDocuments.elapsed(MICROSECONDS), MICROSECONDS), |
| df.format(deleteDeletedDocs.elapsed(MICROSECONDS), MICROSECONDS), |
| df.format(collectAndDeleteSplitDocs.elapsed(MICROSECONDS), MICROSECONDS), |
| timeDeletingSplitDocs); |
| } |
| |
| return "VersionGCStats{" + |
| "ignoredGCDueToCheckPoint=" + ignoredGCDueToCheckPoint + |
| ", canceled=" + canceled + |
| ", deletedDocGCCount=" + deletedDocGCCount + " (of which leaf: " + deletedLeafDocGCCount + ")" + |
| ", updateResurrectedGCCount=" + updateResurrectedGCCount + |
| ", splitDocGCCount=" + splitDocGCCount + |
| ", intermediateSplitDocGCCount=" + intermediateSplitDocGCCount + |
| ", iterationCount=" + iterationCount + |
| ", timeActive=" + df.format(activeElapsed, MICROSECONDS) + |
| ", " + timings + "}"; |
| } |
| |
| void addRun(VersionGCStats run) { |
| ++iterationCount; |
| this.ignoredGCDueToCheckPoint = run.ignoredGCDueToCheckPoint; |
| this.canceled = run.canceled; |
| this.success = run.success; |
| this.limitExceeded = run.limitExceeded; |
| this.needRepeat = run.needRepeat; |
| this.deletedDocGCCount += run.deletedDocGCCount; |
| this.deletedLeafDocGCCount += run.deletedLeafDocGCCount; |
| this.splitDocGCCount += run.splitDocGCCount; |
| this.intermediateSplitDocGCCount += run.intermediateSplitDocGCCount; |
| this.updateResurrectedGCCount += run.updateResurrectedGCCount; |
| if (run.iterationCount > 0) { |
| // run is cumulative with times in elapsed fields |
| this.activeElapsed += run.activeElapsed; |
| this.collectDeletedDocsElapsed += run.collectDeletedDocsElapsed; |
| this.checkDeletedDocsElapsed += run.checkDeletedDocsElapsed; |
| this.deleteDeletedDocsElapsed += run.deleteDeletedDocsElapsed; |
| this.collectAndDeleteSplitDocsElapsed += run.collectAndDeleteSplitDocsElapsed; |
| this.deleteSplitDocsElapsed += run.deleteSplitDocsElapsed; |
| this.sortDocIdsElapsed += run.sortDocIdsElapsed; |
| this.updateResurrectedDocumentsElapsed += run.updateResurrectedDocumentsElapsed; |
| } else { |
| // single run -> read from stop watches |
| this.activeElapsed += run.active.elapsed(MICROSECONDS); |
| this.collectDeletedDocsElapsed += run.collectDeletedDocs.elapsed(MICROSECONDS); |
| this.checkDeletedDocsElapsed += run.checkDeletedDocs.elapsed(MICROSECONDS); |
| this.deleteDeletedDocsElapsed += run.deleteDeletedDocs.elapsed(MICROSECONDS); |
| this.collectAndDeleteSplitDocsElapsed += run.collectAndDeleteSplitDocs.elapsed(MICROSECONDS); |
| this.deleteSplitDocsElapsed += run.deleteSplitDocs.elapsed(MICROSECONDS); |
| this.sortDocIdsElapsed += run.sortDocIds.elapsed(MICROSECONDS); |
| this.updateResurrectedDocumentsElapsed += run.updateResurrectedDocuments.elapsed(MICROSECONDS); |
| } |
| } |
| } |
| |
| private enum GCPhase { |
| NONE, |
| COLLECTING, |
| CHECKING, |
| DELETING, |
| SORTING, |
| SPLITS_CLEANUP, |
| UPDATING |
| } |
| |
| /** |
| * Keeps track of timers when switching GC phases. |
| * <p> |
| * Could be merged with VersionGCStats, however this way the public class is kept unchanged. |
| */ |
| private static class GCPhases { |
| |
| final VersionGCStats stats; |
| final Stopwatch elapsed; |
| private final GCMonitor monitor; |
| private final List<GCPhase> phases = Lists.newArrayList(); |
| private final Map<GCPhase, Stopwatch> watches = Maps.newHashMap(); |
| private final AtomicBoolean canceled; |
| |
| GCPhases(AtomicBoolean canceled, VersionGCStats stats, GCMonitor monitor) { |
| this.stats = stats; |
| this.monitor = monitor; |
| this.elapsed = Stopwatch.createStarted(); |
| this.watches.put(GCPhase.NONE, Stopwatch.createStarted()); |
| this.watches.put(GCPhase.COLLECTING, stats.collectDeletedDocs); |
| this.watches.put(GCPhase.CHECKING, stats.checkDeletedDocs); |
| this.watches.put(GCPhase.DELETING, stats.deleteDeletedDocs); |
| this.watches.put(GCPhase.SORTING, stats.sortDocIds); |
| this.watches.put(GCPhase.SPLITS_CLEANUP, stats.collectAndDeleteSplitDocs); |
| this.watches.put(GCPhase.UPDATING, stats.updateResurrectedDocuments); |
| this.canceled = canceled; |
| } |
| |
| /** |
| * Attempts to start a GC phase and tracks the time spent in this phase |
| * until {@link #stop(GCPhase)} is called. |
| * |
| * @param started the GC phase. |
| * @return {@code true} if the phase was started or {@code false} if the |
| * revision GC was canceled and the phase should not start. |
| */ |
| public boolean start(GCPhase started) { |
| if (canceled.get()) { |
| return false; |
| } |
| suspend(currentWatch()); |
| this.phases.add(started); |
| updateStatus(); |
| resume(currentWatch()); |
| return true; |
| } |
| |
| public void stop(GCPhase phase) { |
| if (!phases.isEmpty() && phase == phases.get(phases.size() - 1)) { |
| suspend(currentWatch()); |
| phases.remove(phases.size() - 1); |
| updateStatus(); |
| resume(currentWatch()); |
| } |
| } |
| |
| public void close() { |
| while (!phases.isEmpty()) { |
| suspend(currentWatch()); |
| phases.remove(phases.size() - 1); |
| updateStatus(); |
| } |
| this.elapsed.stop(); |
| } |
| |
| private GCPhase current() { |
| return phases.isEmpty() ? GCPhase.NONE : phases.get(phases.size() - 1); |
| } |
| |
| private Stopwatch currentWatch() { |
| return watches.get(current()); |
| } |
| |
| private void resume(Stopwatch w) { |
| if (!w.isRunning()) { |
| w.start(); |
| } |
| } |
| |
| private void suspend(Stopwatch w) { |
| if (w.isRunning()) { |
| w.stop(); |
| } |
| } |
| |
| private void updateStatus() { |
| GCPhase p = current(); |
| if (p != GCPhase.NONE) { |
| monitor.updateStatus(p.name()); |
| } |
| } |
| } |
| |
| private class GCJob { |
| |
| private final long maxRevisionAgeMillis; |
| private final VersionGCOptions options; |
| private final AtomicBoolean cancel = new AtomicBoolean(); |
| private final GCMonitor monitor; |
| private final Supplier<String> status; |
| |
| GCJob(long maxRevisionAgeMillis, |
| VersionGCOptions options, |
| GCMonitor gcMonitor) { |
| this.maxRevisionAgeMillis = maxRevisionAgeMillis; |
| this.options = options; |
| GCMessageTracker vgcm = new GCMessageTracker(); |
| this.status = vgcm; |
| this.monitor = new DelegatingGCMonitor(Lists.newArrayList(vgcm, gcMonitor)); |
| this.monitor.updateStatus(STATUS_INITIALIZING); |
| } |
| |
| VersionGCStats run() throws IOException { |
| try { |
| return gc(maxRevisionAgeMillis); |
| } finally { |
| monitor.updateStatus(STATUS_IDLE); |
| } |
| } |
| |
| void cancel() { |
| monitor.info("Canceling revision garbage collection."); |
| cancel.set(true); |
| } |
| |
| String getStatus() { |
| return status.get(); |
| } |
| |
| private VersionGCStats gc(long maxRevisionAgeInMillis) throws IOException { |
| VersionGCStats stats = new VersionGCStats(); |
| stats.active.start(); |
| VersionGCRecommendations rec = new VersionGCRecommendations(maxRevisionAgeInMillis, nodeStore.getCheckpoints(), |
| nodeStore.getClock(), versionStore, options, gcMonitor); |
| GCPhases phases = new GCPhases(cancel, stats, gcMonitor); |
| try { |
| if (rec.ignoreDueToCheckPoint) { |
| phases.stats.ignoredGCDueToCheckPoint = true; |
| monitor.skipped("Checkpoint prevented revision garbage collection"); |
| cancel.set(true); |
| } else { |
| final RevisionVector headRevision = nodeStore.getHeadRevision(); |
| final RevisionVector sweepRevisions = nodeStore.getSweepRevisions(); |
| monitor.info("Looking at revisions in {}", rec.scope); |
| |
| collectDeletedDocuments(phases, headRevision, rec); |
| collectSplitDocuments(phases, sweepRevisions, rec); |
| } |
| } catch (LimitExceededException ex) { |
| stats.limitExceeded = true; |
| } finally { |
| phases.close(); |
| stats.canceled = cancel.get(); |
| } |
| |
| rec.evaluate(stats); |
| monitor.info("Revision garbage collection finished in {}. {}", |
| TimeDurationFormatter.forLogging().format(phases.elapsed.elapsed(MICROSECONDS), MICROSECONDS), stats); |
| stats.active.stop(); |
| return stats; |
| } |
| |
| private void collectSplitDocuments(GCPhases phases, |
| RevisionVector sweepRevisions, |
| VersionGCRecommendations rec) { |
| if (phases.start(GCPhase.SPLITS_CLEANUP)) { |
| int splitDocGCCount = phases.stats.splitDocGCCount; |
| int intermediateSplitDocGCCount = phases.stats.intermediateSplitDocGCCount; |
| versionStore.deleteSplitDocuments(GC_TYPES, sweepRevisions, rec.scope.toMs, phases.stats); |
| gcStats.splitDocumentsDeleted(phases.stats.splitDocGCCount - splitDocGCCount); |
| gcStats.intermediateSplitDocumentsDeleted(phases.stats.intermediateSplitDocGCCount - intermediateSplitDocGCCount); |
| phases.stop(GCPhase.SPLITS_CLEANUP); |
| } |
| } |
| |
| private void collectDeletedDocuments(GCPhases phases, |
| RevisionVector headRevision, |
| VersionGCRecommendations rec) |
| throws IOException, LimitExceededException { |
| int docsTraversed = 0; |
| DeletedDocsGC gc = new DeletedDocsGC(headRevision, cancel, options, monitor); |
| try { |
| if (phases.start(GCPhase.COLLECTING)) { |
| Iterable<NodeDocument> itr = versionStore.getPossiblyDeletedDocs(rec.scope.fromMs, rec.scope.toMs); |
| try { |
| for (NodeDocument doc : itr) { |
| // continue with GC? |
| if (cancel.get()) { |
| break; |
| } |
| // Check if node is actually deleted at current revision |
| // As node is not modified since oldestRevTimeStamp then |
| // this node has not be revived again in past maxRevisionAge |
| // So deleting it is safe |
| docsTraversed++; |
| if (docsTraversed % PROGRESS_BATCH_SIZE == 0) { |
| monitor.info("Iterated through {} documents so far. {} found to be deleted", |
| docsTraversed, gc.getNumDocuments()); |
| } |
| if (phases.start(GCPhase.CHECKING)) { |
| gc.possiblyDeleted(doc); |
| phases.stop(GCPhase.CHECKING); |
| } |
| if (rec.maxCollect > 0 && gc.docIdsToDelete.getSize() > rec.maxCollect) { |
| throw new LimitExceededException(); |
| } |
| if (gc.hasLeafBatch()) { |
| if (phases.start(GCPhase.DELETING)) { |
| gc.removeLeafDocuments(phases.stats); |
| phases.stop(GCPhase.DELETING); |
| } |
| } |
| if (gc.hasRescurrectUpdateBatch()) { |
| if (phases.start(GCPhase.UPDATING)) { |
| gc.updateResurrectedDocuments(phases.stats); |
| phases.stop(GCPhase.UPDATING); |
| } |
| } |
| } |
| } finally { |
| Utils.closeIfCloseable(itr); |
| } |
| phases.stop(GCPhase.COLLECTING); |
| } |
| |
| if (gc.getNumDocuments() != 0) { |
| if (phases.start(GCPhase.DELETING)) { |
| gc.removeLeafDocuments(phases.stats); |
| phases.stop(GCPhase.DELETING); |
| } |
| |
| if (phases.start(GCPhase.SORTING)) { |
| gc.ensureSorted(); |
| phases.stop(GCPhase.SORTING); |
| } |
| |
| if (phases.start(GCPhase.DELETING)) { |
| gc.removeDocuments(phases.stats); |
| phases.stop(GCPhase.DELETING); |
| } |
| } |
| |
| if (phases.start(GCPhase.UPDATING)) { |
| gc.updateResurrectedDocuments(phases.stats); |
| phases.stop(GCPhase.UPDATING); |
| } |
| } finally { |
| gc.close(); |
| } |
| } |
| } |
| |
| /** |
| * A helper class to remove document for deleted nodes. |
| */ |
| private class DeletedDocsGC implements Closeable { |
| |
| private final RevisionVector headRevision; |
| private final AtomicBoolean cancel; |
| private final List<String> leafDocIdsToDelete = Lists.newArrayList(); |
| private final List<String> resurrectedIds = Lists.newArrayList(); |
| private final StringSort docIdsToDelete; |
| private final StringSort prevDocIdsToDelete; |
| private final Set<String> exclude = Sets.newHashSet(); |
| private boolean sorted = false; |
| private final Stopwatch timer; |
| private final VersionGCOptions options; |
| private final GCMonitor monitor; |
| |
| public DeletedDocsGC(@NotNull RevisionVector headRevision, |
| @NotNull AtomicBoolean cancel, |
| @NotNull VersionGCOptions options, |
| @NotNull GCMonitor monitor) { |
| this.headRevision = checkNotNull(headRevision); |
| this.cancel = checkNotNull(cancel); |
| this.timer = Stopwatch.createUnstarted(); |
| this.options = options; |
| this.monitor = monitor; |
| this.docIdsToDelete = newStringSort(options); |
| this.prevDocIdsToDelete = newStringSort(options); |
| } |
| |
| /** |
| * @return the number of documents gathers so far that have been |
| * identified as garbage via {@link #possiblyDeleted(NodeDocument)}. |
| * This number does not include the previous documents. |
| */ |
| long getNumDocuments() { |
| return docIdsToDelete.getSize() + leafDocIdsToDelete.size(); |
| } |
| |
| /** |
| * Informs the GC that the given document is possibly deleted. The |
| * implementation will check if the node still exists at the head |
| * revision passed to the constructor to this GC. The implementation |
| * will keep track of documents representing deleted nodes and remove |
| * them together with associated previous document |
| * |
| * @param doc the candidate document. |
| * @return true iff document is scheduled for deletion |
| */ |
| boolean possiblyDeleted(NodeDocument doc) |
| throws IOException { |
| gcStats.documentRead(); |
| // construct an id that also contains |
| // the _modified time of the document |
| String id = doc.getId() + "/" + doc.getModified(); |
| // check if id is valid |
| try { |
| Utils.getDepthFromId(id); |
| } catch (IllegalArgumentException e) { |
| monitor.warn("Invalid GC id {} for document {}", id, doc); |
| return false; |
| } |
| if (doc.getNodeAtRevision(nodeStore, headRevision, null) == null) { |
| // Collect id of all previous docs also |
| Iterator<String> previousDocs = previousDocIdsFor(doc); |
| if (!doc.hasChildren() && !previousDocs.hasNext()) { |
| addLeafDocument(id); |
| } else { |
| addDocument(id); |
| addPreviousDocuments(previousDocs); |
| } |
| return true; |
| } else { |
| addNonDeletedDocument(id); |
| } |
| return false; |
| } |
| |
| /** |
| * Removes the documents that have been identified as garbage. This |
| * also includes previous documents. This method will only remove |
| * documents that have not been modified since they were passed to |
| * {@link #possiblyDeleted(NodeDocument)}. |
| * |
| * @param stats to track the number of removed documents. |
| */ |
| void removeDocuments(VersionGCStats stats) throws IOException { |
| removeLeafDocuments(stats); |
| stats.deletedDocGCCount += removeDeletedDocuments( |
| getDocIdsToDelete(), getDocIdsToDeleteSize(), false, "(other)"); |
| // FIXME: this is incorrect because that method also removes intermediate docs |
| stats.splitDocGCCount += removeDeletedPreviousDocuments(); |
| } |
| |
| boolean hasLeafBatch() { |
| return leafDocIdsToDelete.size() >= DELETE_BATCH_SIZE; |
| } |
| |
| boolean hasRescurrectUpdateBatch() { |
| return resurrectedIds.size() >= UPDATE_BATCH_SIZE; |
| } |
| |
| void removeLeafDocuments(VersionGCStats stats) throws IOException { |
| int removeCount = removeDeletedDocuments( |
| getLeafDocIdsToDelete(), getLeafDocIdsToDeleteSize(), true, "(leaf)"); |
| leafDocIdsToDelete.clear(); |
| stats.deletedLeafDocGCCount += removeCount; |
| stats.deletedDocGCCount += removeCount; |
| } |
| |
| void updateResurrectedDocuments(VersionGCStats stats) throws IOException { |
| if (resurrectedIds.isEmpty()) { |
| return; |
| } |
| int updateCount = resetDeletedOnce(resurrectedIds); |
| resurrectedIds.clear(); |
| stats.updateResurrectedGCCount += updateCount; |
| } |
| |
| public void close() { |
| try { |
| docIdsToDelete.close(); |
| } catch (IOException e) { |
| monitor.warn("Failed to close docIdsToDelete: {}", e); |
| } |
| try { |
| prevDocIdsToDelete.close(); |
| } catch (IOException e) { |
| monitor.warn("Failed to close prevDocIdsToDelete: {}", e); |
| } |
| } |
| |
| //------------------------------< internal >---------------------------- |
| |
| private void delayOnModifications(long durationMs) { |
| long delayMs = Math.round(durationMs * options.delayFactor); |
| if (!cancel.get() && delayMs > 0) { |
| try { |
| Clock clock = nodeStore.getClock(); |
| clock.waitUntil(clock.getTime() + delayMs); |
| } |
| catch (InterruptedException ex) { |
| /* ignore */ |
| } |
| } |
| } |
| |
| private Iterator<String> previousDocIdsFor(NodeDocument doc) { |
| Map<Revision, Range> prevRanges = doc.getPreviousRanges(true); |
| if (prevRanges.isEmpty()) { |
| return Collections.emptyIterator(); |
| } else if (all(prevRanges.values(), FIRST_LEVEL)) { |
| // all previous document ids can be constructed from the |
| // previous ranges map. this works for first level previous |
| // documents only. |
| final Path path = doc.getPath(); |
| return Iterators.transform(prevRanges.entrySet().iterator(), |
| new Function<Map.Entry<Revision, Range>, String>() { |
| @Override |
| public String apply(Map.Entry<Revision, Range> input) { |
| int h = input.getValue().getHeight(); |
| return Utils.getPreviousIdFor(path, input.getKey(), h); |
| } |
| }); |
| } else { |
| // need to fetch the previous documents to get their ids |
| return Iterators.transform(doc.getAllPreviousDocs(), |
| new Function<NodeDocument, String>() { |
| @Override |
| public String apply(NodeDocument input) { |
| return input.getId(); |
| } |
| }); |
| } |
| } |
| |
| private void addDocument(String id) throws IOException { |
| docIdsToDelete.add(id); |
| } |
| |
| private void addLeafDocument(String id) throws IOException { |
| leafDocIdsToDelete.add(id); |
| } |
| |
| private void addNonDeletedDocument(String id) throws IOException { |
| resurrectedIds.add(id); |
| } |
| |
| private long getNumPreviousDocuments() { |
| return prevDocIdsToDelete.getSize() - exclude.size(); |
| } |
| |
| private void addPreviousDocuments(Iterator<String> ids) throws IOException { |
| while (ids.hasNext()) { |
| prevDocIdsToDelete.add(ids.next()); |
| } |
| } |
| |
| private Iterator<String> getDocIdsToDelete() throws IOException { |
| ensureSorted(); |
| return docIdsToDelete.getIds(); |
| } |
| |
| private long getDocIdsToDeleteSize() { |
| return docIdsToDelete.getSize(); |
| } |
| |
| private Iterator<String> getLeafDocIdsToDelete() throws IOException { |
| return leafDocIdsToDelete.iterator(); |
| } |
| |
| private long getLeafDocIdsToDeleteSize() { |
| return leafDocIdsToDelete.size(); |
| } |
| |
| private void concurrentModification(NodeDocument doc) { |
| Iterator<NodeDocument> it = doc.getAllPreviousDocs(); |
| while (it.hasNext()) { |
| exclude.add(it.next().getId()); |
| } |
| } |
| |
| private Iterator<String> getPrevDocIdsToDelete() throws IOException { |
| ensureSorted(); |
| return Iterators.filter(prevDocIdsToDelete.getIds(), |
| new Predicate<String>() { |
| @Override |
| public boolean apply(String input) { |
| return !exclude.contains(input); |
| } |
| }); |
| } |
| |
| private int removeDeletedDocuments(Iterator<String> docIdsToDelete, |
| long numDocuments, |
| boolean leaves, |
| String label) throws IOException { |
| if (numDocuments == 0) { |
| return 0; |
| } |
| monitor.info("Proceeding to delete [{}] documents [{}]", numDocuments, label); |
| |
| Iterator<List<String>> idListItr = partition(docIdsToDelete, DELETE_BATCH_SIZE); |
| int deletedCount = 0; |
| int lastLoggedCount = 0; |
| int recreatedCount = 0; |
| while (idListItr.hasNext() && !cancel.get()) { |
| Map<String, Long> deletionBatch = Maps.newLinkedHashMap(); |
| for (String s : idListItr.next()) { |
| Map.Entry<String, Long> parsed; |
| try { |
| parsed = parseEntry(s); |
| } catch (IllegalArgumentException e) { |
| monitor.warn("Invalid _modified suffix for {}", s); |
| continue; |
| } |
| deletionBatch.put(parsed.getKey(), parsed.getValue()); |
| } |
| |
| if (log.isTraceEnabled()) { |
| StringBuilder sb = new StringBuilder("Performing batch deletion of documents with following ids. \n"); |
| Joiner.on(LINE_SEPARATOR.value()).appendTo(sb, deletionBatch.keySet()); |
| log.trace(sb.toString()); |
| } |
| |
| timer.reset().start(); |
| try { |
| int nRemoved = ds.remove(NODES, deletionBatch); |
| |
| if (nRemoved < deletionBatch.size()) { |
| // some nodes were re-created while GC was running |
| // find the document that still exist |
| for (String id : deletionBatch.keySet()) { |
| NodeDocument d = ds.find(NODES, id); |
| if (d != null) { |
| concurrentModification(d); |
| } |
| } |
| recreatedCount += (deletionBatch.size() - nRemoved); |
| } |
| |
| deletedCount += nRemoved; |
| log.debug("Deleted [{}] documents so far", deletedCount); |
| if (leaves) { |
| gcStats.leafDocumentsDeleted(deletedCount); |
| } else { |
| gcStats.documentsDeleted(deletedCount); |
| } |
| |
| if (deletedCount + recreatedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE) { |
| lastLoggedCount = deletedCount + recreatedCount; |
| double progress = lastLoggedCount * 1.0 / getNumDocuments() * 100; |
| String msg = String.format("Deleted %d (%1.2f%%) documents so far", deletedCount, progress); |
| monitor.info(msg); |
| } |
| } finally { |
| delayOnModifications(timer.stop().elapsed(TimeUnit.MILLISECONDS)); |
| } |
| } |
| return deletedCount; |
| } |
| |
| private int resetDeletedOnce(List<String> resurrectedDocuments) throws IOException { |
| monitor.info("Proceeding to reset [{}] _deletedOnce flags", resurrectedDocuments.size()); |
| |
| int updateCount = 0; |
| timer.reset().start(); |
| try { |
| for (String s : resurrectedDocuments) { |
| if (!cancel.get()) { |
| try { |
| Map.Entry<String, Long> parsed = parseEntry(s); |
| UpdateOp up = new UpdateOp(parsed.getKey(), false); |
| up.equals(MODIFIED_IN_SECS, parsed.getValue()); |
| up.remove(NodeDocument.DELETED_ONCE); |
| NodeDocument r = ds.findAndUpdate(Collection.NODES, up); |
| if (r != null) { |
| updateCount += 1; |
| gcStats.deletedOnceFlagReset(); |
| } |
| } catch (IllegalArgumentException ex) { |
| monitor.warn("Invalid _modified suffix for {}", s); |
| } catch (DocumentStoreException ex) { |
| monitor.warn("updating {}: {}", s, ex.getMessage()); |
| } |
| } |
| } |
| } |
| finally { |
| delayOnModifications(timer.stop().elapsed(TimeUnit.MILLISECONDS)); |
| } |
| return updateCount; |
| } |
| |
| private int removeDeletedPreviousDocuments() throws IOException { |
| long num = getNumPreviousDocuments(); |
| if (num == 0) { |
| return 0; |
| } |
| monitor.info("Proceeding to delete [{}] previous documents", num); |
| |
| int deletedCount = 0; |
| int lastLoggedCount = 0; |
| Iterator<List<String>> idListItr = |
| partition(getPrevDocIdsToDelete(), DELETE_BATCH_SIZE); |
| while (idListItr.hasNext() && !cancel.get()) { |
| List<String> deletionBatch = idListItr.next(); |
| deletedCount += deletionBatch.size(); |
| |
| if (log.isDebugEnabled()) { |
| StringBuilder sb = new StringBuilder("Performing batch deletion of previous documents with following ids. \n"); |
| Joiner.on(LINE_SEPARATOR.value()).appendTo(sb, deletionBatch); |
| log.debug(sb.toString()); |
| } |
| |
| ds.remove(NODES, deletionBatch); |
| |
| log.debug("Deleted [{}] previous documents so far", deletedCount); |
| gcStats.splitDocumentsDeleted(deletedCount); |
| |
| if (deletedCount - lastLoggedCount >= PROGRESS_BATCH_SIZE) { |
| lastLoggedCount = deletedCount; |
| double progress = deletedCount * 1.0 / (prevDocIdsToDelete.getSize() - exclude.size()) * 100; |
| String msg = String.format("Deleted %d (%1.2f%%) previous documents so far", deletedCount, progress); |
| monitor.info(msg); |
| } |
| } |
| return deletedCount; |
| } |
| |
| private void ensureSorted() throws IOException { |
| if (!sorted) { |
| docIdsToDelete.sort(); |
| prevDocIdsToDelete.sort(); |
| sorted = true; |
| } |
| } |
| |
| /** |
| * Parses an id/modified entry and returns the two components as a |
| * Map.Entry. |
| * |
| * @param entry the id/modified String. |
| * @return the parsed components. |
| * @throws IllegalArgumentException if the entry is malformed. |
| */ |
| private Map.Entry<String, Long> parseEntry(String entry) throws IllegalArgumentException { |
| int idx = entry.lastIndexOf('/'); |
| if (idx == -1) { |
| throw new IllegalArgumentException(entry); |
| } |
| String id = entry.substring(0, idx); |
| long modified; |
| try { |
| modified = Long.parseLong(entry.substring(idx + 1)); |
| } catch (NumberFormatException e) { |
| throw new IllegalArgumentException(entry); |
| } |
| return Maps.immutableEntry(id, modified); |
| } |
| } |
| |
| @NotNull |
| private StringSort newStringSort(VersionGCOptions options) { |
| return new StringSort(options.overflowToDiskThreshold, NodeDocumentIdComparator.INSTANCE); |
| } |
| |
| private static final Predicate<Range> FIRST_LEVEL = new Predicate<Range>() { |
| @Override |
| public boolean apply(@Nullable Range input) { |
| return input != null && input.height == 0; |
| } |
| }; |
| |
| /** |
| * GCMessageTracker is a partial implementation of GCMonitor. We use it to |
| * keep track of the last message issued by the GC job. |
| */ |
| private static class GCMessageTracker |
| extends GCMonitor.Empty |
| implements Supplier<String> { |
| |
| private volatile String lastMessage = STATUS_INITIALIZING; |
| |
| @Override |
| public void info(String message, Object... arguments) { |
| lastMessage = arrayFormat(message, arguments).getMessage(); |
| } |
| |
| @Override |
| public void warn(String message, Object... arguments) { |
| lastMessage = arrayFormat(message, arguments).getMessage(); |
| } |
| |
| @Override |
| public void error(String message, Exception e) { |
| lastMessage = message + " (" + e.getMessage() + ")"; |
| } |
| |
| @Override |
| public String get() { |
| return lastMessage; |
| } |
| } |
| |
| private static final class LimitExceededException extends Exception { |
| private static final long serialVersionUID = 6578586397629516408L; |
| } |
| } |