blob: 346a81ea0f86eaff49a06d7086e52d7e39f11955 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.sort.StringSort;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.asId;
import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
/**
* A DiffCache loader reading from journal entries.
*/
class JournalDiffLoader implements DiffCache.Loader {
private static final Logger LOG = LoggerFactory.getLogger(JournalDiffLoader.class);
private final AbstractDocumentNodeState base;
private final AbstractDocumentNodeState node;
private final DocumentNodeStore ns;
private Stats stats;
JournalDiffLoader(@NotNull AbstractDocumentNodeState base,
@NotNull AbstractDocumentNodeState node,
@NotNull DocumentNodeStore ns) {
this.base = checkNotNull(base);
this.node = checkNotNull(node);
this.ns = checkNotNull(ns);
checkArgument(base.getPath().equals(node.getPath()),
"nodes must have matching paths: {} != {}",
base.getPath(), node.getPath());
}
@Override
public String call() {
RevisionVector afterRev = node.getRootRevision();
RevisionVector beforeRev = base.getRootRevision();
stats = new Stats(node.getPath(), beforeRev, afterRev);
StringSort changes = JournalEntry.newSorter();
try {
Path path = node.getPath();
readTrunkChanges(path, beforeRev, afterRev, changes);
readBranchChanges(path, beforeRev, changes);
readBranchChanges(path, afterRev, changes);
changes.sort();
DiffCache df = ns.getDiffCache();
WrappedDiffCache wrappedCache = new WrappedDiffCache(node.getPath(), df, stats);
JournalEntry.applyTo(changes, wrappedCache, node.getPath(), beforeRev, afterRev);
return wrappedCache.changes;
} catch (IOException e) {
throw DocumentStoreException.convert(e);
} finally {
Utils.closeIfCloseable(changes);
logStats();
}
}
private void readBranchChanges(Path path,
RevisionVector rv,
StringSort changes) throws IOException {
if (!rv.isBranch() || ns.isDisableBranches()) {
return;
}
Branch b = ns.getBranches().getBranch(rv);
if (b == null) {
if (!ns.getBranches().isBranchBase(rv)) {
missingBranch(rv);
}
return;
}
DocumentStore store = ns.getDocumentStore();
for (Revision br : b.getCommits()) {
Branch.BranchCommit bc = b.getCommit(br);
if (!bc.isRebase()) {
JournalEntry entry = store.find(Collection.JOURNAL, asId(br));
if (entry != null) {
entry.addTo(changes, path);
stats.numJournalEntries++;
} else {
LOG.warn("Missing journal entry for {}", asId(br));
}
}
}
}
private void readTrunkChanges(Path path,
RevisionVector beforeRev,
RevisionVector afterRev,
StringSort changes) throws IOException {
JournalEntry localPending = ns.getCurrentJournalEntry();
DocumentStore store = ns.getDocumentStore();
NodeDocument root = Utils.getRootDocument(store);
int clusterId = ns.getClusterId();
Map<Integer, Revision> lastRevs = root.getLastRev();
Revision localLastRev;
if (clusterId == 0) {
// read-only node store
localLastRev = afterRev.getRevision(clusterId);
} else {
localLastRev = lastRevs.get(clusterId);
}
if (localLastRev == null) {
throw new IllegalStateException("Root document does not have a " +
"lastRev entry for local clusterId " + clusterId);
}
if (ns.isDisableBranches()) {
beforeRev = beforeRev.asTrunkRevision();
afterRev = afterRev.asTrunkRevision();
} else {
beforeRev = getBaseRevision(beforeRev);
afterRev = getBaseRevision(afterRev);
}
if (beforeRev.equals(afterRev)) {
return;
}
RevisionVector max = beforeRev.pmax(afterRev);
RevisionVector min = beforeRev.pmin(afterRev);
// do we need to include changes from pending local changes?
if (!max.isRevisionNewer(localLastRev)
&& !localLastRev.equals(max.getRevision(clusterId))) {
// journal does not contain all local changes
localPending.addTo(changes, path);
stats.numJournalEntries++;
}
for (Revision to : max) {
Revision from = min.getRevision(to.getClusterId());
if (from == null) {
// there is no min revision with this clusterId
// use revision with a timestamp of zero
from = new Revision(0, 0, to.getClusterId());
}
StringSort invalidateOnly = JournalEntry.newSorter();
try {
stats.numJournalEntries += fillExternalChanges(changes, invalidateOnly,
path, from, to, ns.getDocumentStore(), entry -> {}, null, null);
} finally {
invalidateOnly.close();
}
}
}
@NotNull
private RevisionVector getBaseRevision(RevisionVector rv) {
if (!rv.isBranch()) {
return rv;
}
Branch b = ns.getBranches().getBranch(rv);
if (b != null) {
rv = b.getBase(rv.getBranchRevision());
} else if (ns.getBranches().isBranchBase(rv)) {
rv = rv.asTrunkRevision();
} else {
missingBranch(rv);
}
return rv;
}
private static void missingBranch(RevisionVector rv) {
throw new IllegalStateException("Missing branch for revision " + rv);
}
private void logStats() {
stats.sw.stop();
long timeInSec = stats.sw.elapsed(TimeUnit.SECONDS);
if (timeInSec > 60) {
LOG.warn(stats.toString());
} else if (timeInSec > 10) {
LOG.info(stats.toString());
} else {
LOG.debug(stats.toString());
}
}
private static class Stats {
private final Stopwatch sw = Stopwatch.createStarted();
private final Path path;
private final RevisionVector from, to;
private long numJournalEntries;
private long numDiffEntries;
private long keyMemory;
private long valueMemory;
Stats(Path path, RevisionVector from, RevisionVector to) {
this.path = path;
this.from = from;
this.to = to;
}
@Override
public String toString() {
String msg = "%d diffs for %s (%s/%s) loaded from %d journal entries in %s. " +
"Keys: %s, values: %s, total: %s";
return String.format(msg, numDiffEntries, path, from, to,
numJournalEntries, sw,
byteCountToDisplaySize(keyMemory),
byteCountToDisplaySize(valueMemory),
byteCountToDisplaySize(keyMemory + valueMemory));
}
}
private static class WrappedDiffCache extends DiffCache {
private final Path path;
private String changes = "";
private final DiffCache cache;
private Stats stats;
WrappedDiffCache(Path path,
DiffCache cache,
Stats stats) {
this.path = path;
this.cache = cache;
this.stats = stats;
}
@Nullable
String getChanges() {
return changes;
}
@Override
String getChanges(@NotNull RevisionVector from,
@NotNull RevisionVector to,
@NotNull Path path,
@Nullable Loader loader) {
return cache.getChanges(from, to, path, loader);
}
@NotNull
@Override
Entry newEntry(@NotNull final RevisionVector from,
@NotNull final RevisionVector to,
boolean local) {
final Entry entry = cache.newEntry(from, to, local);
return new Entry() {
@Override
public void append(@NotNull Path path,
@NotNull String changes) {
trackStats(path, from, to, changes);
entry.append(path, changes);
if (path.equals(WrappedDiffCache.this.path)) {
WrappedDiffCache.this.changes = changes;
}
}
@Override
public boolean done() {
return entry.done();
}
};
}
private void trackStats(Path path,
RevisionVector from,
RevisionVector to,
String changes) {
stats.numDiffEntries++;
stats.keyMemory += path.getMemory();
stats.keyMemory += from.getMemory();
stats.keyMemory += to.getMemory();
stats.valueMemory += new StringValue(changes).getMemory();
}
@NotNull
@Override
Iterable<CacheStats> getStats() {
return cache.getStats();
}
@Override
public void invalidateAll() {
cache.invalidateAll();
}
}
}