blob: 4bca73bb251986a915d75cca8b45cd43cdf6cfe8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Sets.filter;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COMMIT_ROOT;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.DOC_SIZE_THRESHOLD;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.PREV_SPLIT_FACTOR;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.REVISIONS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isCommitRootEntry;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isRevisionsEntry;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removePrevious;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setHasBinary;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setPrevious;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getPreviousIdFor;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isCommitted;
/**
* Utility class to create document split operations.
*/
class SplitOperations {
private static final Logger LOG = LoggerFactory.getLogger(SplitOperations.class);
private static final int GARBAGE_LIMIT = Integer.getInteger("oak.documentMK.garbage.limit", 1000);
private static final Predicate<Long> BINARY_FOR_SPLIT_THRESHOLD = new Predicate<Long>() {
@Override
public boolean apply(Long input) {
// only force trigger split for binaries bigger than 4k
return input > 4096;
}
};
private static final DocumentStore STORE = new MemoryDocumentStore();
private final NodeDocument doc;
private final Path path;
private final String id;
private final Revision headRevision;
private final RevisionContext context;
private final Function<String, Long> binarySize;
private final int numRevsThreshold;
private Revision high;
private Revision low;
private int numValues;
private boolean hasBinaryToSplit;
private Supplier<Boolean> nodeExistsAtHeadRevision;
private Map<String, NavigableMap<Revision, String>> committedChanges;
private Set<Revision> changes;
private Map<String, Set<Revision>> garbage;
private int garbageCount = 0;
private Set<Revision> mostRecentRevs;
private Set<Revision> splitRevs;
private List<UpdateOp> splitOps;
private UpdateOp main;
private SplitOperations(@NotNull final NodeDocument doc,
@NotNull final RevisionContext context,
@NotNull final RevisionVector headRev,
@NotNull final Function<String, Long> binarySize,
int numRevsThreshold) {
this.doc = checkNotNull(doc);
this.context = checkNotNull(context);
this.binarySize = checkNotNull(binarySize);
this.path = doc.getPath();
this.id = doc.getId();
this.headRevision = checkNotNull(headRev).getRevision(context.getClusterId());
this.numRevsThreshold = numRevsThreshold;
this.nodeExistsAtHeadRevision = Suppliers.memoize(new Supplier<Boolean>() {
@Override
public Boolean get() {
return doc.getLiveRevision(context, headRev,
Maps.<Revision, String>newHashMap(),
new LastRevs(headRev)) != null;
}
});
}
/**
* Creates a list of update operations in case the given document requires
* a split. A caller must explicitly pass a head revision even though it
* is available through the {@link RevisionContext}. The given head revision
* must reflect a head state before {@code doc} was retrieved from the
* document store. This is important in order to maintain consistency.
* See OAK-3081 for details.
*
* @param doc a main document.
* @param context the revision context.
* @param headRevision the head revision before the document was retrieved
* from the document store.
* @param binarySize a function that returns the binary size of the given
* JSON property value String.
* @param numRevsThreshold only split off at least this number of revisions.
* @return list of update operations. An empty list indicates the document
* does not require a split.
* @throws IllegalArgumentException if the given document is a split
* document.
*/
@NotNull
static List<UpdateOp> forDocument(@NotNull NodeDocument doc,
@NotNull RevisionContext context,
@NotNull RevisionVector headRevision,
@NotNull Function<String, Long> binarySize,
int numRevsThreshold) {
if (doc.isSplitDocument()) {
throw new IllegalArgumentException(
"Not a main document: " + doc.getId());
}
return new SplitOperations(doc, context, headRevision,
binarySize, numRevsThreshold).create();
}
private List<UpdateOp> create() {
if (!considerSplit()) {
return Collections.emptyList();
}
splitOps = Lists.newArrayList();
mostRecentRevs = Sets.newHashSet();
splitRevs = Sets.newHashSet();
garbage = Maps.newHashMap();
changes = Sets.newHashSet();
committedChanges = Maps.newHashMap();
collectLocalChanges(committedChanges, changes);
// revisions of the most recent committed changes on this document
// these are kept in the main document. _revisions and _commitRoot
// entries with these revisions are retained in the main document
populateSplitRevs();
// collect _revisions and _commitRoot entries for split document
collectRevisionsAndCommitRoot();
// create split ops out of the split values
main = createSplitOps();
// create intermediate docs if needed
createIntermediateDocs();
// remove stale references to previous docs
disconnectStalePrevDocs();
// remove garbage
removeGarbage();
// main document must be updated last
if (main != null) {
splitOps.add(main);
}
return splitOps;
}
private boolean considerSplit() {
SortedMap<Revision, Range> previous = doc.getPreviousRanges();
// only consider if there are enough commits,
// unless document is really big
return doc.getLocalRevisions().size() + doc.getLocalCommitRoot().size() > numRevsThreshold
|| doc.getMemory() >= DOC_SIZE_THRESHOLD
|| previous.size() >= PREV_SPLIT_FACTOR
|| !doc.getStalePrev().isEmpty()
|| doc.hasBinary();
}
/**
* Populate the {@link #splitRevs} with the revisions of the committed
* changes that will be moved to a previous document. For each property,
* all but the most recent change will be moved.
*/
private void populateSplitRevs() {
for (NavigableMap<Revision, String> splitMap : committedChanges.values()) {
// keep the most recent changes in the main document
if (!splitMap.isEmpty()) {
Revision r = splitMap.lastKey();
splitMap.remove(r);
splitRevs.addAll(splitMap.keySet());
hasBinaryToSplit |= hasBinaryPropertyForSplit(splitMap.values())
&& nodeExistsAtHeadRevision.get();
mostRecentRevs.add(r);
}
if (splitMap.isEmpty()) {
continue;
}
// remember highest / lowest revision
trackHigh(splitMap.lastKey());
trackLow(splitMap.firstKey());
numValues += splitMap.size();
}
}
private boolean hasBinaryPropertyForSplit(Iterable<String> values) {
return doc.hasBinary() && any(transform(values, binarySize), BINARY_FOR_SPLIT_THRESHOLD);
}
/**
* Collect _revisions and _commitRoot entries that can be moved to a
* previous document.
*/
private void collectRevisionsAndCommitRoot() {
NavigableMap<Revision, String> revisions =
new TreeMap<Revision, String>(StableRevisionComparator.INSTANCE);
for (Map.Entry<Revision, String> entry : doc.getLocalRevisions().entrySet()) {
if (splitRevs.contains(entry.getKey())) {
revisions.put(entry.getKey(), entry.getValue());
numValues++;
} else {
// move _revisions entries that act as commit root without
// local changes
if (context.getClusterId() != entry.getKey().getClusterId()) {
// only consider local changes
continue;
}
if (isCommitted(context.getCommitValue(entry.getKey(), doc))
&& !mostRecentRevs.contains(entry.getKey())) {
// this is a commit root for changes in other documents
revisions.put(entry.getKey(), entry.getValue());
numValues++;
trackHigh(entry.getKey());
trackLow(entry.getKey());
}
}
}
committedChanges.put(REVISIONS, revisions);
NavigableMap<Revision, String> commitRoot =
new TreeMap<Revision, String>(StableRevisionComparator.INSTANCE);
boolean mostRecent = true;
for (Map.Entry<Revision, String> entry : doc.getLocalCommitRoot().entrySet()) {
Revision r = entry.getKey();
if (splitRevs.contains(r)) {
commitRoot.put(r, entry.getValue());
numValues++;
} else if (r.getClusterId() == context.getClusterId()
&& !changes.contains(r)) {
// OAK-2528: _commitRoot entry without associated change
// consider all but most recent as garbage (OAK-3333, OAK-4050)
if (mostRecent && isCommitted(context.getCommitValue(r, doc))) {
mostRecent = false;
} else if (isGarbage(r)) {
addGarbage(r, COMMIT_ROOT);
}
}
}
committedChanges.put(COMMIT_ROOT, commitRoot);
}
/**
* Creates {@link UpdateOp}s for intermediate documents if necessary.
*/
private void createIntermediateDocs() {
// collect ranges and create a histogram of the height
Map<Integer, List<Range>> prevHisto = getPreviousDocsHistogram();
// check if we need to create intermediate previous documents
for (Map.Entry<Integer, List<Range>> entry : prevHisto.entrySet()) {
if (entry.getValue().size() >= PREV_SPLIT_FACTOR) {
if (main == null) {
main = new UpdateOp(id, false);
}
// calculate range
Revision h = null;
Revision l = null;
for (Range r : entry.getValue()) {
if (h == null || r.high.compareRevisionTime(h) > 0) {
h = r.high;
}
if (l == null || l.compareRevisionTime(r.low) > 0) {
l = r.low;
}
removePrevious(main, r);
}
if (h == null || l == null) {
throw new IllegalStateException();
}
Path prevPath = Utils.getPreviousPathFor(path, h, entry.getKey() + 1);
String prevId = Utils.getIdFromPath(prevPath);
UpdateOp intermediate = new UpdateOp(prevId, true);
if (Utils.isIdFromLongPath(prevId)) {
intermediate.set(NodeDocument.PATH, prevPath.toString());
}
setPrevious(main, new Range(h, l, entry.getKey() + 1));
for (Range r : entry.getValue()) {
setPrevious(intermediate, r);
}
setIntermediateDocProps(intermediate, h);
splitOps.add(intermediate);
}
}
}
/**
* Creates split {@link UpdateOp} if there is enough data to split off. The
* {@link UpdateOp} for the new previous document is placed into the list of
* {@link #splitOps}. The {@link UpdateOp} for the main document is not
* added to the list but rather returned.
*
* @return the UpdateOp for the main document or {@code null} if there is
* not enough data to split.
*/
@Nullable
private UpdateOp createSplitOps() {
UpdateOp main = null;
// check if we have enough data to split off
if (high != null && low != null
&& (numValues >= numRevsThreshold
|| doc.getMemory() > DOC_SIZE_THRESHOLD
|| hasBinaryToSplit)) {
// enough changes to split off
// move to another document
main = new UpdateOp(id, false);
setPrevious(main, new Range(high, low, 0));
Path oldPath = Utils.getPreviousPathFor(path, high, 0);
String oldId = Utils.getIdFromPath(oldPath);
UpdateOp old = new UpdateOp(oldId, true);
if (Utils.isIdFromLongPath(oldId)) {
old.set(NodeDocument.PATH, oldPath.toString());
}
for (String property : committedChanges.keySet()) {
NavigableMap<Revision, String> splitMap = committedChanges.get(property);
for (Map.Entry<Revision, String> entry : splitMap.entrySet()) {
Revision r = entry.getKey();
if (isRevisionsEntry(property) || isCommitRootEntry(property)) {
// only remove from main document if it is not
// referenced anymore from from most recent changes
if (!mostRecentRevs.contains(r)) {
main.removeMapEntry(property, r);
NodeDocument.removeBranchCommit(main, r);
}
} else {
main.removeMapEntry(property, r);
}
old.setMapEntry(property, r, entry.getValue());
if (doc.getLocalBranchCommits().contains(r)) {
NodeDocument.setBranchCommit(old, r);
}
}
}
// check size of old document
NodeDocument oldDoc = new NodeDocument(STORE);
UpdateUtils.applyChanges(oldDoc, old);
setSplitDocProps(doc, oldDoc, old, high);
splitOps.add(old);
if (numValues < numRevsThreshold) {
String reason;
if (hasBinaryToSplit) {
reason = "binary";
} else {
reason = "size";
}
LOG.debug("Force splitting {} ({})", id, reason);
}
}
return main;
}
/**
* Returns a histogram of the height of the previous documents referenced
* by this document. This only includes direct references and not indirectly
* referenced previous documents through intermediate previous docs.
*
* @return histogram of the height of the previous documents.
*/
private Map<Integer, List<Range>> getPreviousDocsHistogram() {
Map<Integer, List<Range>> prevHisto = Maps.newHashMap();
for (Map.Entry<Revision, Range> entry : doc.getPreviousRanges().entrySet()) {
Revision rev = entry.getKey();
if (rev.getClusterId() != context.getClusterId()) {
continue;
}
Range r = entry.getValue();
List<Range> list = prevHisto.get(r.getHeight());
if (list == null) {
list = new ArrayList<Range>();
prevHisto.put(r.getHeight(), list);
}
list.add(r);
}
return prevHisto;
}
/**
* Collects all local property changes committed by the current
* cluster node.
*
* @param committedLocally local changes committed by the current cluster node.
* @param changes all revisions of local changes (committed and uncommitted).
*/
private void collectLocalChanges(
Map<String, NavigableMap<Revision, String>> committedLocally,
Set<Revision> changes) {
for (String property : filter(doc.keySet(), PROPERTY_OR_DELETED)) {
NavigableMap<Revision, String> splitMap
= new TreeMap<Revision, String>(StableRevisionComparator.INSTANCE);
committedLocally.put(property, splitMap);
Map<Revision, String> valueMap = doc.getLocalMap(property);
// collect committed changes of this cluster node
for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
Revision rev = entry.getKey();
if (rev.getClusterId() != context.getClusterId()) {
continue;
}
changes.add(rev);
if (isCommitted(context.getCommitValue(rev, doc))) {
splitMap.put(rev, entry.getValue());
} else if (isGarbage(rev)) {
addGarbage(rev, property);
}
}
}
}
private boolean isGarbage(Revision rev) {
// use headRevision as passed in the constructor instead
// of the head revision from the RevisionContext. see OAK-3081
if (headRevision.compareRevisionTime(rev) <= 0) {
// this may be an in-progress commit
return false;
}
// garbage if not part of an active branch
return context.getBranches().getBranchCommit(rev) == null;
}
private void addGarbage(Revision rev, String property) {
if (garbageCount > GARBAGE_LIMIT) {
return;
}
Set<Revision> revisions = garbage.get(property);
if (revisions == null) {
revisions = Sets.newHashSet();
garbage.put(property, revisions);
}
if (revisions.add(rev)) {
garbageCount++;
}
}
private void disconnectStalePrevDocs() {
NavigableMap<Revision, Range> ranges = doc.getPreviousRanges(true);
for (Map.Entry<Revision, String> entry : doc.getStalePrev().entrySet()) {
Revision r = entry.getKey();
if (r.getClusterId() != context.getClusterId()) {
// only process revisions of this cluster node
continue;
}
if (main == null) {
main = new UpdateOp(id, false);
}
NodeDocument.removeStalePrevious(main, r);
if (ranges.containsKey(r)
&& entry.getValue().equals(String.valueOf(ranges.get(r).height))) {
NodeDocument.removePrevious(main, r);
} else {
// reference was moved to an intermediate doc
// while the last GC was running
// -> need to locate intermediate doc and disconnect from there
int height = Integer.parseInt(entry.getValue());
NodeDocument intermediate = doc.findPrevReferencingDoc(r, height);
if (intermediate == null) {
LOG.warn("Split document {} not referenced anymore. Main document is {}",
getPreviousIdFor(doc.getPath(), r, height), id);
} else {
UpdateOp op = new UpdateOp(intermediate.getId(), false);
NodeDocument.removePrevious(op, r);
splitOps.add(op);
}
}
}
}
private void removeGarbage() {
if (garbage.isEmpty()) {
return;
} else if (main == null) {
main = new UpdateOp(id, false);
}
for (Map.Entry<String, Set<Revision>> entry : garbage.entrySet()) {
for (Revision r : entry.getValue()) {
main.removeMapEntry(entry.getKey(), r);
if (PROPERTY_OR_DELETED.apply(entry.getKey())) {
NodeDocument.removeCommitRoot(main, r);
NodeDocument.removeRevision(main, r);
NodeDocument.removeBranchCommit(main, r);
}
}
}
}
private void trackHigh(Revision r) {
if (high == null || r.compareRevisionTime(high) > 0) {
high = r;
}
}
private void trackLow(Revision r) {
if (low == null || low.compareRevisionTime(r) > 0) {
low = r;
}
}
/**
* Set various split document related flag/properties
*
* @param mainDoc main document from which split document is being created
* @param old updateOp of the old document created via split
* @param oldDoc old document created via split
* @param maxRev max revision stored in the split document oldDoc
*/
private static void setSplitDocProps(NodeDocument mainDoc, NodeDocument oldDoc,
UpdateOp old, Revision maxRev) {
setSplitDocMaxRev(old, maxRev);
SplitDocType type = SplitDocType.DEFAULT;
if (!mainDoc.hasChildren()) {
type = SplitDocType.DEFAULT_LEAF;
} else if (oldDoc.getLocalRevisions().isEmpty()) {
type = SplitDocType.COMMIT_ROOT_ONLY;
} else if (oldDoc.getLocalBranchCommits().isEmpty()) {
type = SplitDocType.DEFAULT_NO_BRANCH;
}
// Copy over the hasBinary flag
if (mainDoc.hasBinary()) {
setHasBinary(old);
}
setSplitDocType(old, type);
}
/**
* Set various properties for intermediate split document
*
* @param intermediate updateOp of the intermediate doc getting created
* @param maxRev max revision stored in the intermediate
*/
private static void setIntermediateDocProps(UpdateOp intermediate, Revision maxRev) {
setSplitDocMaxRev(intermediate, maxRev);
setSplitDocType(intermediate, SplitDocType.INTERMEDIATE);
}
//----------------------------< internal modifiers >------------------------
private static void setSplitDocType(@NotNull UpdateOp op,
@NotNull SplitDocType type) {
checkNotNull(op).set(NodeDocument.SD_TYPE, type.type);
}
private static void setSplitDocMaxRev(@NotNull UpdateOp op,
@NotNull Revision maxRev) {
checkNotNull(op).set(NodeDocument.SD_MAX_REV_TIME_IN_SECS, NodeDocument.getModifiedInSecs(maxRev.getTimestamp()));
}
}