blob: 0b1b1c332381b64f453b60e84ab4035bbd84fb7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.newHashMap;
import static org.apache.jackrabbit.oak.api.Type.STRING;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.segment.scheduler.Commit;
import org.apache.jackrabbit.oak.segment.scheduler.LockBasedScheduler;
import org.apache.jackrabbit.oak.segment.scheduler.Scheduler;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The top level class for the segment store.
* <p>
* The root node of the JCR content tree is actually stored in the node "/root",
* and checkpoints are stored under "/checkpoints".
*/
public class SegmentNodeStore implements NodeStore, Observable {
public static class SegmentNodeStoreBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SegmentNodeStoreBuilder.class);
@NotNull
private final Revisions revisions;
@NotNull
private final SegmentReader reader;
@NotNull
private final SegmentWriter writer;
@Nullable
private final BlobStore blobStore;
private boolean isCreated;
private boolean dispatchChanges = true;
@NotNull
private StatisticsProvider statsProvider = StatisticsProvider.NOOP;
private LoggingHook loggingHook;
private SegmentNodeStoreBuilder(
@NotNull Revisions revisions,
@NotNull SegmentReader reader,
@NotNull SegmentWriter writer,
@Nullable BlobStore blobStore) {
this.revisions = revisions;
this.reader = reader;
this.writer = writer;
this.blobStore = blobStore;
}
@NotNull
public SegmentNodeStoreBuilder dispatchChanges(boolean dispatchChanges) {
this.dispatchChanges = dispatchChanges;
return this;
}
/**
* {@link StatisticsProvider} for collecting statistics related to SegmentStore
* @param statisticsProvider
* @return this instance
*/
@NotNull
public SegmentNodeStoreBuilder withStatisticsProvider(@NotNull StatisticsProvider statisticsProvider) {
this.statsProvider = checkNotNull(statisticsProvider);
return this;
}
/**
* {@link LoggingHook} for recording write operations to a log file
*
* @return this instance
*/
@NotNull
public SegmentNodeStoreBuilder withLoggingHook(Consumer<String> writer) {
this.loggingHook = LoggingHook.newLoggingHook(writer);
return this;
}
@NotNull
public SegmentNodeStore build() {
checkState(!isCreated);
isCreated = true;
LOG.info("Creating segment node store {}", this);
return new SegmentNodeStore(this);
}
@NotNull
private static String getString(@Nullable BlobStore blobStore) {
return "blobStore=" + (blobStore == null ? "inline" : blobStore);
}
@Override
public String toString() {
return "SegmentNodeStoreBuilder{" +
getString(blobStore) +
'}';
}
}
@NotNull
public static SegmentNodeStoreBuilder builder(
@NotNull Revisions revisions,
@NotNull SegmentReader reader,
@NotNull SegmentWriter writer,
@Nullable BlobStore blobStore) {
return new SegmentNodeStoreBuilder(checkNotNull(revisions),
checkNotNull(reader), checkNotNull(writer), blobStore);
}
static final String ROOT = "root";
public static final String CHECKPOINTS = "checkpoints";
@NotNull
private final SegmentWriter writer;
@NotNull
private final Scheduler scheduler;
@Nullable
private final BlobStore blobStore;
private final SegmentNodeStoreStats stats;
private final LoggingHook loggingHook;
private SegmentNodeStore(SegmentNodeStoreBuilder builder) {
this.writer = builder.writer;
this.blobStore = builder.blobStore;
this.stats = new SegmentNodeStoreStats(builder.statsProvider);
this.scheduler = LockBasedScheduler.builder(builder.revisions, builder.reader, stats)
.dispatchChanges(builder.dispatchChanges)
.build();
this.loggingHook = builder.loggingHook;
}
@Override
public Closeable addObserver(Observer observer) {
if (scheduler instanceof Observable) {
return ((Observable) scheduler).addObserver(observer);
}
return () -> {};
}
@Override @NotNull
public NodeState getRoot() {
return scheduler.getHeadNodeState().getChildNode(ROOT);
}
@NotNull
@Override
public NodeState merge(
@NotNull NodeBuilder builder, @NotNull CommitHook commitHook,
@NotNull CommitInfo info) throws CommitFailedException {
checkArgument(builder instanceof SegmentNodeBuilder);
checkArgument(((SegmentNodeBuilder) builder).isRootBuilder());
if (loggingHook != null) {
commitHook = new CompositeHook(commitHook, loggingHook);
}
return scheduler.schedule(new Commit(builder, commitHook, info));
}
@Override @NotNull
public NodeState rebase(@NotNull NodeBuilder builder) {
checkArgument(builder instanceof SegmentNodeBuilder);
SegmentNodeBuilder snb = (SegmentNodeBuilder) builder;
NodeState root = getRoot();
NodeState before = snb.getBaseState();
if (!SegmentNodeState.fastEquals(before, root)) {
SegmentNodeState after = snb.getNodeState();
snb.reset(root);
after.compareAgainstBaseState(
before, new ConflictAnnotatingRebaseDiff(snb));
}
return snb.getNodeState();
}
@Override @NotNull
public NodeState reset(@NotNull NodeBuilder builder) {
checkArgument(builder instanceof SegmentNodeBuilder);
SegmentNodeBuilder snb = (SegmentNodeBuilder) builder;
NodeState root = getRoot();
snb.reset(root);
return root;
}
@NotNull
@Override
public Blob createBlob(InputStream stream) throws IOException {
return new SegmentBlob(blobStore, writer.writeStream(stream));
}
@Override
public Blob getBlob(@NotNull String reference) {
//Use of 'reference' here is bit overloaded. In terms of NodeStore API
//a blob reference refers to the secure reference obtained from Blob#getReference()
//However in SegmentStore terminology a blob is referred via 'external reference'
//That 'external reference' would map to blobId obtained from BlobStore#getBlobId
if (blobStore != null) {
String blobId = blobStore.getBlobId(reference);
if (blobId != null) {
return new BlobStoreBlob(blobStore, blobId);
}
return null;
}
throw new IllegalStateException("Attempt to read external blob with blobId [" + reference + "] " +
"without specifying BlobStore");
}
@NotNull
@Override
public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
return scheduler.checkpoint(lifetime, properties);
}
@Override @NotNull
public synchronized String checkpoint(long lifetime) {
return checkpoint(lifetime, Collections.<String, String>emptyMap());
}
@NotNull
@Override
public Map<String, String> checkpointInfo(@NotNull String checkpoint) {
Map<String, String> properties = newHashMap();
checkNotNull(checkpoint);
NodeState cp = scheduler.getHeadNodeState()
.getChildNode("checkpoints")
.getChildNode(checkpoint)
.getChildNode("properties");
for (PropertyState prop : cp.getProperties()) {
properties.put(prop.getName(), prop.getValue(STRING));
}
return properties;
}
@NotNull
@Override
public Iterable<String> checkpoints() {
return getCheckpoints().getChildNodeNames();
}
@Override @Nullable
public NodeState retrieve(@NotNull String checkpoint) {
checkNotNull(checkpoint);
NodeState cp = scheduler.getHeadNodeState()
.getChildNode("checkpoints")
.getChildNode(checkpoint)
.getChildNode(ROOT);
if (cp.exists()) {
return cp;
}
return null;
}
@Override
public boolean release(@NotNull String checkpoint) {
return scheduler.removeCheckpoint(checkpoint);
}
NodeState getCheckpoints() {
return scheduler.getHeadNodeState().getChildNode(CHECKPOINTS);
}
public SegmentNodeStoreStats getStats() {
return stats;
}
}