blob: 7fe04fbb33e87747636e3a605295623f149cc0ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.math.BigInteger;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
import org.apache.jackrabbit.oak.commons.json.JsopReader;
import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Checkpoints provide details around which revision are to be kept. These
* are stored in Settings collection.
*/
class Checkpoints {
private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
private static final String ID = "checkpoint";
/**
* Property name to store all checkpoint data. The data is either stored as
* Revision => expiryTime or Revision => JSON with expiryTime and info.
*/
private static final String PROP_CHECKPOINT = "data";
/**
* Number of create calls after which old expired checkpoints entries would
* be removed
*/
static final int CLEANUP_INTERVAL = 100;
private final DocumentNodeStore nodeStore;
private final DocumentStore store;
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicInteger createCounter = new AtomicInteger();
private final Object cleanupLock = new Object();
Checkpoints(DocumentNodeStore store) {
this.nodeStore = store;
this.store = store.getDocumentStore();
createIfNotExist();
}
public Revision create(long lifetimeInMillis, Map<String, String> info) {
// create a unique dummy commit we can use as checkpoint revision
Revision r = nodeStore.commitQueue.createRevision();
final RevisionVector[] rv = new RevisionVector[1];
nodeStore.commitQueue.done(r, new CommitQueue.Callback() {
@Override
public void headOfQueue(@NotNull Revision revision) {
rv[0] = nodeStore.getHeadRevision();
}
});
createCounter.getAndIncrement();
performCleanupIfRequired();
UpdateOp op = new UpdateOp(ID, false);
long endTime = BigInteger.valueOf(nodeStore.getClock().getTime())
.add(BigInteger.valueOf(lifetimeInMillis))
.min(BigInteger.valueOf(Long.MAX_VALUE)).longValue();
op.setMapEntry(PROP_CHECKPOINT, r, new Info(endTime, rv[0], info).toString());
store.createOrUpdate(Collection.SETTINGS, op);
return r;
}
public void release(String checkpoint) {
UpdateOp op = new UpdateOp(ID, false);
op.removeMapEntry(PROP_CHECKPOINT, Revision.fromString(checkpoint));
store.findAndUpdate(Collection.SETTINGS, op);
}
/**
* Returns the oldest valid checkpoint registered.
*
* <p>It also performs cleanup of expired checkpoint
*
* @return oldest valid checkpoint registered. Might return null if no valid
* checkpoint found
*/
@SuppressWarnings("unchecked")
@Nullable
public Revision getOldestRevisionToKeep() {
//Get uncached doc
SortedMap<Revision, Info> checkpoints = getCheckpoints();
if(checkpoints.isEmpty()){
log.debug("No checkpoint registered so far");
return null;
}
final long currentTime = nodeStore.getClock().getTime();
UpdateOp op = new UpdateOp(ID, false);
Revision lastAliveRevision = null;
for (Map.Entry<Revision, Info> e : checkpoints.entrySet()) {
long expiryTime = e.getValue().getExpiryTime();
if (currentTime > expiryTime) {
op.removeMapEntry(PROP_CHECKPOINT, e.getKey());
} else {
Revision cpRev = e.getKey();
RevisionVector rv = e.getValue().getCheckpoint();
if (rv != null) {
cpRev = rv.getRevision(cpRev.getClusterId());
}
lastAliveRevision = Utils.min(lastAliveRevision, cpRev);
}
}
if (op.hasChanges()) {
store.findAndUpdate(Collection.SETTINGS, op);
log.debug("Purged {} expired checkpoints", op.getChanges().size());
}
return lastAliveRevision;
}
@SuppressWarnings("unchecked")
@NotNull
SortedMap<Revision, Info> getCheckpoints() {
Document cdoc = store.find(Collection.SETTINGS, ID, 0);
SortedMap<Revision, String> data = null;
if (cdoc != null) {
data = (SortedMap<Revision, String>) cdoc.get(PROP_CHECKPOINT);
}
SortedMap<Revision, Info> checkpoints = Maps.newTreeMap(StableRevisionComparator.REVERSE);
if (data != null) {
for (Map.Entry<Revision, String> entry : data.entrySet()) {
checkpoints.put(entry.getKey(), Info.fromString(entry.getValue()));
}
}
return checkpoints;
}
/**
* Retrieves the head revision for the given {@code checkpoint}.
*
* @param checkpoint the checkpoint reference.
* @return the head revision associated with the checkpoint or {@code null}
* if there is no such checkpoint.
* @throws IllegalArgumentException if the checkpoint is malformed.
*/
@Nullable
RevisionVector retrieve(@NotNull String checkpoint)
throws IllegalArgumentException {
Revision r;
try {
r = Revision.fromString(checkNotNull(checkpoint));
} catch (IllegalArgumentException e) {
LOG.warn("Malformed checkpoint reference: {}", checkpoint);
return null;
}
Info info = getCheckpoints().get(r);
if (info == null) {
return null;
}
RevisionVector rv = info.getCheckpoint();
if (rv == null) {
rv = expand(r);
}
return rv;
}
void setInfoProperty(@NotNull String checkpoint, @NotNull String key, @Nullable String value) {
Revision r = Revision.fromString(checkNotNull(checkpoint));
Info info = getCheckpoints().get(r);
if (info == null) {
throw new IllegalArgumentException("No such checkpoint: " + checkpoint);
}
Map<String, String> metadata = new LinkedHashMap<>(info.get());
if (value == null) {
metadata.remove(key);
} else {
metadata.put(key, value);
}
Info newInfo = new Info(info.getExpiryTime(), info.getCheckpoint(), metadata);
UpdateOp op = new UpdateOp(Checkpoints.ID, false);
op.setMapEntry(PROP_CHECKPOINT, r, newInfo.toString());
store.findAndUpdate(Collection.SETTINGS, op);
}
int size() {
return getCheckpoints().size();
}
/**
* Triggers collection of expired checkpoints createCounter exceeds certain size
*/
private void performCleanupIfRequired() {
if(createCounter.get() > CLEANUP_INTERVAL){
synchronized (cleanupLock){
getOldestRevisionToKeep();
createCounter.set(0);
}
}
}
private void createIfNotExist() {
if (store.find(Collection.SETTINGS, ID) == null) {
UpdateOp updateOp = new UpdateOp(ID, true);
store.createOrUpdate(Collection.SETTINGS, updateOp);
}
}
private RevisionVector expand(Revision checkpoint) {
LOG.warn("Expanding {} single revision checkpoint into a " +
"RevisionVector. Please make sure all cluster nodes run " +
"with the same Oak version.", checkpoint);
// best effort conversion
Map<Integer, Revision> revs = Maps.newHashMap();
RevisionVector head = nodeStore.getHeadRevision();
for (Revision r : head) {
int cId = r.getClusterId();
if (cId == checkpoint.getClusterId()) {
revs.put(cId, checkpoint);
} else {
revs.put(cId, new Revision(checkpoint.getTimestamp(), 0, cId));
}
}
return head.pmin(new RevisionVector(revs.values()));
}
static final class Info {
private static final String EXPIRES = "expires";
private static final String REVISION_VECTOR = "rv";
private final long expiryTime;
private final RevisionVector checkpoint;
private final Map<String, String> info;
private Info(long expiryTime,
@Nullable RevisionVector checkpoint,
@NotNull Map<String, String> info) {
this.expiryTime = expiryTime;
this.checkpoint = checkpoint;
this.info = Collections.unmodifiableMap(info);
}
static Info fromString(String info) {
long expiryTime;
RevisionVector rv = null;
Map<String, String> map;
if (info.startsWith("{")) {
map = Maps.newHashMap();
JsopReader reader = new JsopTokenizer(info);
reader.read('{');
String key = reader.readString();
if (!EXPIRES.equals(key)) {
throw new IllegalArgumentException("First entry in the " +
"checkpoint info must be the expires date: " + info);
}
reader.read(':');
expiryTime = Long.parseLong(reader.readString());
while (reader.matches(',')) {
key = reader.readString();
reader.read(':');
String value = reader.readString();
// second entry is potentially checkpoint revision vector
if (rv == null && map.isEmpty() && REVISION_VECTOR.equals(key)) {
// try to read checkpoint
try {
rv = RevisionVector.fromString(value);
} catch (IllegalArgumentException e) {
// not a revision vector, read as regular info entry
map.put(key, value);
}
} else {
map.put(key, value);
}
}
reader.read('}');
reader.read(JsopReader.END);
} else {
// old format
map = Collections.emptyMap();
expiryTime = Long.parseLong(info);
}
return new Info(expiryTime, rv, map);
}
Map<String, String> get() {
return info;
}
long getExpiryTime() {
return expiryTime;
}
/**
* The revision vector associated with this checkpoint or {@code null}
* if this checkpoint was created with a version of Oak, which did not
* yet support revision vectors.
*
* @return the revision vector checkpoint or {@code null}.
*/
@Nullable
RevisionVector getCheckpoint() {
return checkpoint;
}
@Override
public String toString() {
JsopWriter writer = new JsopBuilder();
writer.object();
writer.key(EXPIRES).value(Long.toString(expiryTime));
if (checkpoint != null) {
writer.key(REVISION_VECTOR).value(checkpoint.toString());
}
for (Map.Entry<String, String> entry : info.entrySet()) {
writer.key(entry.getKey()).value(entry.getValue());
}
writer.endObject();
return writer.toString();
}
}
}