blob: b74de5f292e30bdb66bd6bcec291ecb8b244b9a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkElementIndex;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkPositionIndex;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.addAll;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newArrayListWithCapacity;
import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
import static com.google.common.collect.Lists.partition;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.io.ByteStreams.read;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.nCopies;
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
import static org.apache.jackrabbit.oak.api.Type.BINARY;
import static org.apache.jackrabbit.oak.api.Type.NAME;
import static org.apache.jackrabbit.oak.api.Type.NAMES;
import static org.apache.jackrabbit.oak.api.Type.STRING;
import static org.apache.jackrabbit.oak.segment.MapRecord.BUCKETS_PER_LEVEL;
import static org.apache.jackrabbit.oak.segment.RecordWriters.newNodeStateWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jcr.PropertyType;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.Closeables;
import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@code SegmentWriter} converts nodes, properties, values, etc. to records and
* persists them with the help of a {@link WriteOperationHandler}.
* All public methods of this class are thread safe if and only if the
* {@link WriteOperationHandler} passed to the constructor is thread safe.
*/
public class SegmentWriter {
private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class);
static final int BLOCK_SIZE = 1 << 12; // 4kB
@Nonnull
private final WriterCacheManager cacheManager;
@Nonnull
private final SegmentStore store;
@Nonnull
private final SegmentReader reader;
@CheckForNull
private final BlobStore blobStore;
@Nonnull
private final WriteOperationHandler writeOperationHandler;
@Nonnull
private final BinaryReferenceConsumer binaryReferenceConsumer;
/**
* Create a new instance of a {@code SegmentWriter}. Note the thread safety properties
* pointed out in the class comment.
*
* @param store store to write to
* @param reader segment reader for the {@code store}
* @param blobStore the blog store or {@code null} for inlined blobs
* @param cacheManager cache manager instance for the de-duplication caches used by this writer
* @param writeOperationHandler handler for write operations.
*/
public SegmentWriter(@Nonnull SegmentStore store,
@Nonnull SegmentReader reader,
@Nullable BlobStore blobStore,
@Nonnull WriterCacheManager cacheManager,
@Nonnull WriteOperationHandler writeOperationHandler,
@Nonnull BinaryReferenceConsumer binaryReferenceConsumer
) {
this.store = checkNotNull(store);
this.reader = checkNotNull(reader);
this.blobStore = blobStore;
this.cacheManager = checkNotNull(cacheManager);
this.writeOperationHandler = checkNotNull(writeOperationHandler);
this.binaryReferenceConsumer = checkNotNull(binaryReferenceConsumer);
}
public void flush() throws IOException {
writeOperationHandler.flush();
}
/**
* Write a map record.
* @param base base map relative to which the {@code changes} are applied ot
* {@code null} for the empty map.
* @param changes the changed mapping to apply to the {@code base} map.
* @return the map record written
* @throws IOException
*/
@Nonnull
public MapRecord writeMap(@Nullable final MapRecord base,
@Nonnull final Map<String, RecordId> changes)
throws IOException {
RecordId mapId = writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeMap(base, changes);
}
});
return new MapRecord(reader, mapId);
}
/**
* Write a list record.
* @param list the list to write.
* @return the record id of the list written
* @throws IOException
*/
@Nonnull
public RecordId writeList(@Nonnull final List<RecordId> list) throws IOException {
return writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeList(list);
}
});
}
/**
* Write a string record.
* @param string the string to write.
* @return the record id of the string written.
* @throws IOException
*/
@Nonnull
public RecordId writeString(@Nonnull final String string) throws IOException {
return writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeString(string);
}
});
}
/**
* Write a blob (as list of block records)
* @param blob blob to write
* @return The segment blob written
* @throws IOException
*/
@Nonnull
public SegmentBlob writeBlob(@Nonnull final Blob blob) throws IOException {
RecordId blobId = writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeBlob(blob);
}
});
return new SegmentBlob(blobStore, blobId);
}
/**
* Writes a block record containing the given block of bytes.
*
* @param bytes source buffer
* @param offset offset within the source buffer
* @param length number of bytes to write
* @return block record identifier
*/
@Nonnull
public RecordId writeBlock(@Nonnull final byte[] bytes, final int offset, final int length)
throws IOException {
return writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeBlock(bytes, offset, length);
}
});
}
/**
* Writes a stream value record. The given stream is consumed <em>and closed</em> by
* this method.
*
* @param stream stream to be written
* @return blob for the passed {@code stream}
* @throws IOException if the input stream could not be read or the output could not be written
*/
@Nonnull
public SegmentBlob writeStream(@Nonnull final InputStream stream) throws IOException {
RecordId blobId = writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeStream(stream);
}
});
return new SegmentBlob(blobStore, blobId);
}
/**
* Write a property.
* @param state the property to write
* @return the property state written
* @throws IOException
*/
@Nonnull
public SegmentPropertyState writeProperty(@Nonnull final PropertyState state)
throws IOException {
RecordId id = writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return with(writer).writeProperty(state);
}
});
return new SegmentPropertyState(reader, id, state.getName(), state.getType());
}
/**
* Write a node state
* @param state node state to write
* @return segment node state equal to {@code state}
* @throws IOException
*/
@Nonnull
public SegmentNodeState writeNode(@Nonnull final NodeState state) throws IOException {
RecordId nodeId = writeOperationHandler.execute(new SegmentWriteOperation() {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return new CompactionStats(writeNodeStats, compactNodeStats, false)
.writeNode(this, writer, state);
}
});
return new SegmentNodeState(reader, this, nodeId);
}
/**
* Write a node state, unless cancelled using a dedicated write operation handler.
* The write operation handler is automatically {@link WriteOperationHandler#flush() flushed}
* once the node has been written successfully.
* @param state node state to write
* @param writeOperationHandler the write operation handler through which all write calls
* induced by by this call are routed.
* @param cancel supplier to signal cancellation of this write operation
* @return segment node state equal to {@code state} or {@code null} if cancelled.
* @throws IOException
*/
@CheckForNull
public SegmentNodeState writeNode(@Nonnull final NodeState state,
@Nonnull WriteOperationHandler writeOperationHandler,
@Nonnull Supplier<Boolean> cancel)
throws IOException {
try {
RecordId nodeId = writeOperationHandler.execute(new SegmentWriteOperation(cancel) {
@Override
public RecordId execute(SegmentBufferWriter writer) throws IOException {
return new CompactionStats(writeNodeStats, compactNodeStats, true)
.writeNode(this, writer, state);
}
});
writeOperationHandler.flush();
return new SegmentNodeState(reader, this, nodeId);
} catch (SegmentWriteOperation.CancelledWriteException ignore) {
return null;
}
}
private final SynchronizedDescriptiveStatistics writeNodeStats = new SynchronizedDescriptiveStatistics();
private final SynchronizedDescriptiveStatistics compactNodeStats = new SynchronizedDescriptiveStatistics();
// FIXME OAK-4445: Collect write statistics: clean this up:
// - It should be possible to switch the statistics on/off. There should be no
// performance penalty when off.
// - Expose via logging and/or MBean?
// - What metrics should we collect? Use the Metrics API!?
// - Decouple this from the SegmentWriter
private static class CompactionStats {
@Nonnull
private final SynchronizedDescriptiveStatistics writeNodeStats;
@Nonnull
private final SynchronizedDescriptiveStatistics compactNodeStats;
/*
* {@code true} iff this is an explicit compaction (vs. an implicit
* and deferred one triggered by a commit referring to an "old" base
* state.
*/
private final boolean isCompaction;
/*
* Total number of nodes in the subtree rooted at the node passed
* to {@link #writeNode(SegmentWriteOperation, SegmentBufferWriter, NodeState)}
*/
public int nodeCount;
/*
* Number of cache hits for a deferred compacted node
*/
public int cacheHits;
/*
* Number of cache misses for a deferred compacted node
*/
public int cacheMiss;
/*
* Number of nodes that where de-duplicated as the store already contained
* them.
*/
public int deDupNodes;
/*
* Number of nodes that actually had to be written as there was no de-duplication
* and a cache miss (in case of a deferred compaction).
*/
public int writesOps;
public CompactionStats(
@Nonnull SynchronizedDescriptiveStatistics writeNodeStats,
@Nonnull SynchronizedDescriptiveStatistics compactNodeStats,
boolean isCompaction) {
this.writeNodeStats = writeNodeStats;
this.compactNodeStats = compactNodeStats;
this.isCompaction = isCompaction;
}
/*
* The operation caused a deferred compaction iff it accessed the cache.
*/
public boolean isDeferredCompactionOp() {
return cacheHits + cacheMiss > 0;
}
@Nonnull
public RecordId writeNode(
@Nonnull SegmentWriteOperation op,
@Nonnull SegmentBufferWriter writer,
@Nonnull NodeState state)
throws IOException {
long t = System.nanoTime();
try {
return op.with(writer).with(this).writeNode(state, 0);
} finally {
if (isCompaction) {
LOG.info("Write node stats: {}", writeNodeStats);
LOG.info("Compact node stats: {}", compactNodeStats);
writeNodeStats.clear();
compactNodeStats.clear();
} else {
if (isDeferredCompactionOp()) {
compactNodeStats.addValue(System.nanoTime() - t);
LOG.info(toString());
} else {
writeNodeStats.addValue(System.nanoTime() - t);
}
}
}
}
@Override
public String toString() {
return "NodeStats{" +
"op=" + (isDeferredCompactionOp() ? "compact" : "write") +
", nodeCount=" + nodeCount +
", writeOps=" + writesOps +
", deDupNodes=" + deDupNodes +
", cacheHits=" + cacheHits +
", cacheMiss=" + cacheMiss +
", hitRate=" + (100*(double) cacheHits / ((double) cacheHits + (double) cacheMiss)) +
'}';
}
}
/**
* This {@code WriteOperation} implementation is used internally to provide
* context to a recursive chain of calls without having pass the context
* as a separate argument (a poor mans monad). As such it is entirely
* <em>not thread safe</em>.
*/
private abstract class SegmentWriteOperation implements WriteOperation {
/**
* This exception is used internally to signal cancellation of a (recursive)
* write node operation.
*/
private class CancelledWriteException extends IOException {
public CancelledWriteException() {
super("Cancelled write operation");
}
}
@Nonnull
private final Supplier<Boolean> cancel;
@CheckForNull
private CompactionStats compactionStats;
private SegmentBufferWriter writer;
private RecordCache<String> stringCache;
private RecordCache<Template> templateCache;
private NodeCache nodeCache;
protected SegmentWriteOperation(@Nonnull Supplier<Boolean> cancel) {
this.cancel = cancel;
}
protected SegmentWriteOperation() {
this(Suppliers.ofInstance(false));
}
@Override
public abstract RecordId execute(SegmentBufferWriter writer) throws IOException;
@Nonnull
SegmentWriteOperation with(@Nonnull SegmentBufferWriter writer) {
checkState(this.writer == null);
this.writer = writer;
int generation = writer.getGeneration();
this.stringCache = cacheManager.getStringCache(generation);
this.templateCache = cacheManager.getTemplateCache(generation);
this.nodeCache = cacheManager.getNodeCache(generation);
return this;
}
@Nonnull
SegmentWriteOperation with(@Nonnull CompactionStats compactionStats) {
this.compactionStats = compactionStats;
return this;
}
private RecordId writeMap(@Nullable MapRecord base,
@Nonnull Map<String, RecordId> changes)
throws IOException {
if (base != null && base.isDiff()) {
Segment segment = base.getSegment();
RecordId key = segment.readRecordId(base.getOffset(8));
String name = reader.readString(key);
if (!changes.containsKey(name)) {
changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
}
base = new MapRecord(reader, segment.readRecordId(base.getOffset(8, 2)));
}
if (base != null && changes.size() == 1) {
Map.Entry<String, RecordId> change =
changes.entrySet().iterator().next();
RecordId value = change.getValue();
if (value != null) {
MapEntry entry = base.getEntry(change.getKey());
if (entry != null) {
if (value.equals(entry.getValue())) {
return base.getRecordId();
} else {
return RecordWriters.newMapBranchWriter(entry.getHash(), asList(entry.getKey(),
value, base.getRecordId())).write(writer);
}
}
}
}
List<MapEntry> entries = newArrayList();
for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
String key = entry.getKey();
RecordId keyId = null;
if (base != null) {
MapEntry e = base.getEntry(key);
if (e != null) {
keyId = e.getKey();
}
}
if (keyId == null && entry.getValue() != null) {
keyId = writeString(key);
}
if (keyId != null) {
entries.add(new MapEntry(reader, key, keyId, entry.getValue()));
}
}
return writeMapBucket(base, entries, 0);
}
private RecordId writeMapLeaf(int level, Collection<MapEntry> entries) throws IOException {
checkNotNull(entries);
int size = entries.size();
checkElementIndex(size, MapRecord.MAX_SIZE);
checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
return RecordWriters.newMapLeafWriter(level, entries).write(writer);
}
private RecordId writeMapBranch(int level, int size, MapRecord... buckets) throws IOException {
int bitmap = 0;
List<RecordId> bucketIds = newArrayListWithCapacity(buckets.length);
for (int i = 0; i < buckets.length; i++) {
if (buckets[i] != null) {
bitmap |= 1L << i;
bucketIds.add(buckets[i].getRecordId());
}
}
return RecordWriters.newMapBranchWriter(level, size, bitmap, bucketIds).write(writer);
}
private RecordId writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level)
throws IOException {
// when no changed entries, return the base map (if any) as-is
if (entries == null || entries.isEmpty()) {
if (base != null) {
return base.getRecordId();
} else if (level == 0) {
return RecordWriters.newMapLeafWriter().write(writer);
} else {
return null;
}
}
// when no base map was given, write a fresh new map
if (base == null) {
// use leaf records for small maps or the last map level
if (entries.size() <= BUCKETS_PER_LEVEL
|| level == MapRecord.MAX_NUMBER_OF_LEVELS) {
return writeMapLeaf(level, entries);
}
// write a large map by dividing the entries into buckets
MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
List<List<MapEntry>> changes = splitToBuckets(entries, level);
for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
buckets[i] = mapRecordOrNull(writeMapBucket(null, changes.get(i), level + 1));
}
// combine the buckets into one big map
return writeMapBranch(level, entries.size(), buckets);
}
// if the base map is small, update in memory and write as a new map
if (base.isLeaf()) {
Map<String, MapEntry> map = newHashMap();
for (MapEntry entry : base.getEntries()) {
map.put(entry.getName(), entry);
}
for (MapEntry entry : entries) {
if (entry.getValue() != null) {
map.put(entry.getName(), entry);
} else {
map.remove(entry.getName());
}
}
return writeMapBucket(null, map.values(), level);
}
// finally, the if the base map is large, handle updates per bucket
int newSize = 0;
int newCount = 0;
MapRecord[] buckets = base.getBuckets();
List<List<MapEntry>> changes = splitToBuckets(entries, level);
for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
buckets[i] = mapRecordOrNull(writeMapBucket(buckets[i], changes.get(i), level + 1));
if (buckets[i] != null) {
newSize += buckets[i].size();
newCount++;
}
}
// OAK-654: what if the updated map is smaller?
if (newSize > BUCKETS_PER_LEVEL) {
return writeMapBranch(level, newSize, buckets);
} else if (newCount <= 1) {
// up to one bucket contains entries, so return that as the new map
for (MapRecord bucket : buckets) {
if (bucket != null) {
return bucket.getRecordId();
}
}
// no buckets remaining, return empty map
return writeMapBucket(null, null, level);
} else {
// combine all remaining entries into a leaf record
List<MapEntry> list = newArrayList();
for (MapRecord bucket : buckets) {
if (bucket != null) {
addAll(list, bucket.getEntries());
}
}
return writeMapLeaf(level, list);
}
}
private MapRecord mapRecordOrNull(RecordId id) {
return id == null ? null : new MapRecord(reader, id);
}
/**
* Writes a list record containing the given list of record identifiers.
*
* @param list list of record identifiers
* @return list record identifier
*/
private RecordId writeList(@Nonnull List<RecordId> list) throws IOException {
checkNotNull(list);
checkArgument(!list.isEmpty());
List<RecordId> thisLevel = list;
while (thisLevel.size() > 1) {
List<RecordId> nextLevel = newArrayList();
for (List<RecordId> bucket :
partition(thisLevel, ListRecord.LEVEL_SIZE)) {
if (bucket.size() > 1) {
nextLevel.add(writeListBucket(bucket));
} else {
nextLevel.add(bucket.get(0));
}
}
thisLevel = nextLevel;
}
return thisLevel.iterator().next();
}
private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
checkArgument(bucket.size() > 1);
return RecordWriters.newListBucketWriter(bucket).write(writer);
}
private List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
List<List<MapEntry>> buckets =
newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, (List<MapEntry>) null));
for (MapEntry entry : entries) {
int index = (entry.getHash() >> shift) & mask;
List<MapEntry> bucket = buckets.get(index);
if (bucket == null) {
bucket = newArrayList();
buckets.set(index, bucket);
}
bucket.add(entry);
}
return buckets;
}
private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
return RecordWriters.newValueWriter(blocks, len).write(writer);
}
private RecordId writeValueRecord(int length, byte... data) throws IOException {
checkArgument(length < Segment.MEDIUM_LIMIT);
return RecordWriters.newValueWriter(length, data).write(writer);
}
/**
* Writes a string value record.
*
* @param string string to be written
* @return value record identifier
*/
private RecordId writeString(@Nonnull String string) throws IOException {
RecordId id = stringCache.get(string);
if (id != null) {
return id; // shortcut if the same string was recently stored
}
byte[] data = string.getBytes(UTF_8);
if (data.length < Segment.MEDIUM_LIMIT) {
// only cache short strings to avoid excessive memory use
id = writeValueRecord(data.length, data);
stringCache.put(string, id);
return id;
}
int pos = 0;
List<RecordId> blockIds = newArrayListWithExpectedSize(
data.length / BLOCK_SIZE + 1);
// write as many full bulk segments as possible
while (pos + Segment.MAX_SEGMENT_SIZE <= data.length) {
SegmentId bulkId = store.newBulkSegmentId();
store.writeSegment(bulkId, data, pos, Segment.MAX_SEGMENT_SIZE);
for (int i = 0; i < Segment.MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
blockIds.add(new RecordId(bulkId, i));
}
pos += Segment.MAX_SEGMENT_SIZE;
}
// inline the remaining data as block records
while (pos < data.length) {
int len = Math.min(BLOCK_SIZE, data.length - pos);
blockIds.add(writeBlock(data, pos, len));
pos += len;
}
return writeValueRecord(data.length, writeList(blockIds));
}
private boolean sameStore(SegmentId id) {
return id.sameStore(store);
}
/**
* @param blob
* @return {@code true} iff {@code blob} is a {@code SegmentBlob}
* and originates from the same segment store.
*/
private boolean sameStore(Blob blob) {
return (blob instanceof SegmentBlob)
&& sameStore(((Record) blob).getRecordId().getSegmentId());
}
private RecordId writeBlob(@Nonnull Blob blob) throws IOException {
if (sameStore(blob)) {
SegmentBlob segmentBlob = (SegmentBlob) blob;
if (!isOldGeneration(segmentBlob.getRecordId())) {
return segmentBlob.getRecordId();
}
if (segmentBlob.isExternal()) {
return writeBlobId(segmentBlob.getBlobId());
}
}
String reference = blob.getReference();
if (reference != null && blobStore != null) {
String blobId = blobStore.getBlobId(reference);
if (blobId != null) {
return writeBlobId(blobId);
} else {
LOG.debug("No blob found for reference {}, inlining...", reference);
}
}
return writeStream(blob.getNewStream());
}
/**
* Write a reference to an external blob. This method handles blob IDs of
* every length, but behaves differently for small and large blob IDs.
*
* @param blobId Blob ID.
* @return Record ID pointing to the written blob ID.
* @see Segment#BLOB_ID_SMALL_LIMIT
*/
private RecordId writeBlobId(String blobId) throws IOException {
byte[] data = blobId.getBytes(UTF_8);
RecordId recordId;
if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
recordId = RecordWriters.newBlobIdWriter(data).write(writer);
} else {
recordId = RecordWriters.newBlobIdWriter(writeString(blobId)).write(writer);
}
binaryReferenceConsumer.consume(writer.getGeneration(), blobId);
return recordId;
}
private RecordId writeBlock(@Nonnull byte[] bytes, int offset, int length)
throws IOException {
checkNotNull(bytes);
checkPositionIndexes(offset, offset + length, bytes.length);
return RecordWriters.newBlockWriter(bytes, offset, length).write(writer);
}
private RecordId writeStream(@Nonnull InputStream stream) throws IOException {
boolean threw = true;
try {
RecordId id = SegmentStream.getRecordIdIfAvailable(stream, store);
if (id == null) {
// This is either not a segment stream or a one from another store:
// fully serialise the stream.
id = internalWriteStream(stream);
} else if (isOldGeneration(id)) {
// This is a segment stream from this store but from an old generation:
// try to link to the blocks if there are any.
SegmentStream segmentStream = (SegmentStream) stream;
List<RecordId> blockIds = segmentStream.getBlockIds();
if (blockIds == null) {
return internalWriteStream(stream);
} else {
return writeValueRecord(segmentStream.getLength(), writeList(blockIds));
}
}
threw = false;
return id;
} finally {
Closeables.close(stream, threw);
}
}
private RecordId internalWriteStream(@Nonnull InputStream stream) throws IOException {
// Special case for short binaries (up to about 16kB):
// store them directly as small- or medium-sized value records
byte[] data = new byte[Segment.MEDIUM_LIMIT];
int n = read(stream, data, 0, data.length);
if (n < Segment.MEDIUM_LIMIT) {
return writeValueRecord(n, data);
}
if (blobStore != null) {
String blobId = blobStore.writeBlob(new SequenceInputStream(
new ByteArrayInputStream(data, 0, n), stream));
return writeBlobId(blobId);
}
data = Arrays.copyOf(data, Segment.MAX_SEGMENT_SIZE);
n += read(stream, data, n, Segment.MAX_SEGMENT_SIZE - n);
long length = n;
List<RecordId> blockIds =
newArrayListWithExpectedSize(2 * n / BLOCK_SIZE);
// Write the data to bulk segments and collect the list of block ids
while (n != 0) {
SegmentId bulkId = store.newBulkSegmentId();
int len = Segment.align(n, 1 << Segment.RECORD_ALIGN_BITS);
LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
store.writeSegment(bulkId, data, 0, len);
for (int i = 0; i < n; i += BLOCK_SIZE) {
blockIds.add(new RecordId(bulkId, data.length - len + i));
}
n = read(stream, data, 0, data.length);
length += n;
}
return writeValueRecord(length, writeList(blockIds));
}
private RecordId writeProperty(@Nonnull PropertyState state) throws IOException {
Map<String, RecordId> previousValues = emptyMap();
return writeProperty(state, previousValues);
}
private RecordId writeProperty(@Nonnull PropertyState state,
@Nonnull Map<String, RecordId> previousValues)
throws IOException {
Type<?> type = state.getType();
int count = state.count();
List<RecordId> valueIds = newArrayList();
for (int i = 0; i < count; i++) {
if (type.tag() == PropertyType.BINARY) {
try {
valueIds.add(writeBlob(state.getValue(BINARY, i)));
} catch (IOException e) {
throw new IllegalStateException("Unexpected IOException", e);
}
} else {
String value = state.getValue(STRING, i);
RecordId valueId = previousValues.get(value);
if (valueId == null) {
valueId = writeString(value);
}
valueIds.add(valueId);
}
}
if (!type.isArray()) {
return valueIds.iterator().next();
} else if (count == 0) {
return RecordWriters.newListWriter().write(writer);
} else {
return RecordWriters.newListWriter(count, writeList(valueIds)).write(writer);
}
}
private RecordId writeTemplate(Template template) throws IOException {
checkNotNull(template);
RecordId id = templateCache.get(template);
if (id != null) {
return id; // shortcut if the same template was recently stored
}
Collection<RecordId> ids = newArrayList();
int head = 0;
RecordId primaryId = null;
PropertyState primaryType = template.getPrimaryType();
if (primaryType != null) {
head |= 1 << 31;
primaryId = writeString(primaryType.getValue(NAME));
ids.add(primaryId);
}
List<RecordId> mixinIds = null;
PropertyState mixinTypes = template.getMixinTypes();
if (mixinTypes != null) {
head |= 1 << 30;
mixinIds = newArrayList();
for (String mixin : mixinTypes.getValue(NAMES)) {
mixinIds.add(writeString(mixin));
}
ids.addAll(mixinIds);
checkState(mixinIds.size() < (1 << 10));
head |= mixinIds.size() << 18;
}
RecordId childNameId = null;
String childName = template.getChildName();
if (childName == Template.ZERO_CHILD_NODES) {
head |= 1 << 29;
} else if (childName == Template.MANY_CHILD_NODES) {
head |= 1 << 28;
} else {
childNameId = writeString(childName);
ids.add(childNameId);
}
PropertyTemplate[] properties = template.getPropertyTemplates();
RecordId[] propertyNames = new RecordId[properties.length];
byte[] propertyTypes = new byte[properties.length];
for (int i = 0; i < properties.length; i++) {
// Note: if the property names are stored in more than 255 separate
// segments, this will not work.
propertyNames[i] = writeString(properties[i].getName());
Type<?> type = properties[i].getType();
if (type.isArray()) {
propertyTypes[i] = (byte) -type.tag();
} else {
propertyTypes[i] = (byte) type.tag();
}
}
RecordId propNamesId = null;
if (propertyNames.length > 0) {
propNamesId = writeList(asList(propertyNames));
ids.add(propNamesId);
}
checkState(propertyNames.length < (1 << 18));
head |= propertyNames.length;
RecordId tid = RecordWriters.newTemplateWriter(ids, propertyNames,
propertyTypes, head, primaryId, mixinIds, childNameId,
propNamesId).write(writer);
templateCache.put(template, tid);
return tid;
}
private RecordId writeNode(@Nonnull NodeState state, int depth) throws IOException {
if (cancel.get()) {
// Poor man's Either Monad
throw new CancelledWriteException();
}
assert compactionStats != null;
compactionStats.nodeCount++;
RecordId compactedId = deduplicateNode(state);
if (compactedId != null) {
return compactedId;
}
compactionStats.writesOps++;
RecordId recordId = writeNodeUncached(state, depth);
if (state instanceof SegmentNodeState) {
// This node state has been rewritten because it is from an older
// generation (e.g. due to compaction). Put it into the cache for
// deduplication of hard links to it (e.g. checkpoints).
SegmentNodeState sns = (SegmentNodeState) state;
nodeCache.put(sns.getStableId(), recordId, depth);
}
return recordId;
}
private RecordId writeNodeUncached(@Nonnull NodeState state, int depth) throws IOException {
ModifiedNodeState after = null;
if (state instanceof ModifiedNodeState) {
after = (ModifiedNodeState) state;
}
RecordId beforeId = null;
if (after != null) {
beforeId = deduplicateNode(after.getBaseState());
}
SegmentNodeState before = null;
Template beforeTemplate = null;
if (beforeId != null) {
before = reader.readNode(beforeId);
beforeTemplate = before.getTemplate();
}
List<RecordId> ids = newArrayList();
Template template = new Template(reader, state);
if (template.equals(beforeTemplate)) {
ids.add(before.getTemplateId());
} else {
ids.add(writeTemplate(template));
}
String childName = template.getChildName();
if (childName == Template.MANY_CHILD_NODES) {
MapRecord base;
Map<String, RecordId> childNodes;
if (before != null
&& before.getChildNodeCount(2) > 1
&& after.getChildNodeCount(2) > 1) {
base = before.getChildNodeMap();
childNodes = new ChildNodeCollectorDiff(depth).diff(before, after);
} else {
base = null;
childNodes = newHashMap();
for (ChildNodeEntry entry : state.getChildNodeEntries()) {
childNodes.put(
entry.getName(),
writeNode(entry.getNodeState(), depth + 1));
}
}
ids.add(writeMap(base, childNodes));
} else if (childName != Template.ZERO_CHILD_NODES) {
ids.add(writeNode(state.getChildNode(template.getChildName()), depth + 1));
}
List<RecordId> pIds = newArrayList();
for (PropertyTemplate pt : template.getPropertyTemplates()) {
String name = pt.getName();
PropertyState property = state.getProperty(name);
assert property != null;
if (before != null) {
// If this property is already present in before (the base state)
// and it hasn't been modified use that one. This will result
// in an already compacted property to be reused given before
// has been already compacted.
PropertyState beforeProperty = before.getProperty(name);
if (property.equals(beforeProperty)) {
property = beforeProperty;
}
}
if (sameStore(property)) {
RecordId pid = ((Record) property).getRecordId();
if (isOldGeneration(pid)) {
pIds.add(writeProperty(property));
} else {
pIds.add(pid);
}
} else if (before == null || !sameStore(before)) {
pIds.add(writeProperty(property));
} else {
// reuse previously stored property, if possible
PropertyTemplate bt = beforeTemplate.getPropertyTemplate(name);
if (bt == null) {
pIds.add(writeProperty(property)); // new property
} else {
SegmentPropertyState bp = beforeTemplate.getProperty(before.getRecordId(), bt.getIndex());
if (property.equals(bp)) {
pIds.add(bp.getRecordId()); // no changes
} else if (bp.isArray() && bp.getType() != BINARIES) {
// reuse entries from the previous list
pIds.add(writeProperty(property, bp.getValueRecords()));
} else {
pIds.add(writeProperty(property));
}
}
}
}
if (!pIds.isEmpty()) {
ids.add(writeList(pIds));
}
RecordId stableId = null;
if (state instanceof SegmentNodeState) {
byte[] id = ((SegmentNodeState) state).getStableIdBytes();
stableId = writeBlock(id, 0, id.length);
}
return newNodeStateWriter(stableId, ids).write(writer);
}
/**
* Try to deduplicate the passed {@code node}. This succeeds if
* the passed node state has already been persisted to this store and
* either it has the same generation or it has been already compacted
* and is still in the de-duplication cache for nodes.
*
* @param node The node states to de-duplicate.
* @return the id of the de-duplicated node or {@code null} if none.
*/
private RecordId deduplicateNode(NodeState node) {
assert compactionStats != null;
if (!(node instanceof SegmentNodeState)) {
// De-duplication only for persisted node states
return null;
}
SegmentNodeState sns = (SegmentNodeState) node;
if (!sameStore(sns)) {
// De-duplication only within same store
return null;
}
if (!isOldGeneration(sns.getRecordId())) {
// This segment node state is already in this store, no need to
// write it again
compactionStats.deDupNodes++;
return sns.getRecordId();
}
// This is a segment node state from an old generation. Check
// whether an equivalent one of the current generation is in the
// cache
RecordId compacted = nodeCache.get(sns.getStableId());
if (compacted == null) {
compactionStats.cacheMiss++;
return null;
}
compactionStats.cacheHits++;
return compacted;
}
/**
* @param node
* @return {@code true} iff {@code node} originates from the same segment store.
*/
private boolean sameStore(SegmentNodeState node) {
return sameStore(node.getRecordId().getSegmentId());
}
/**
* @param property
* @return {@code true} iff {@code property} is a {@code SegmentPropertyState}
* and originates from the same segment store.
*/
private boolean sameStore(PropertyState property) {
return (property instanceof SegmentPropertyState)
&& sameStore(((Record) property).getRecordId().getSegmentId());
}
private boolean isOldGeneration(RecordId id) {
try {
int thatGen = id.getSegment().getGcGeneration();
int thisGen = writer.getGeneration();
return thatGen < thisGen;
} catch (SegmentNotFoundException snfe) {
// This SNFE means a defer compacted node state is too far
// in the past. It has been gc'ed already and cannot be
// compacted.
// Consider increasing SegmentGCOptions.getRetainedGenerations()
throw new SegmentNotFoundException(
"Cannot copy record from a generation that has been gc'ed already", snfe);
}
}
private class ChildNodeCollectorDiff extends DefaultNodeStateDiff {
private final int depth;
private final Map<String, RecordId> childNodes = newHashMap();
private IOException exception;
private ChildNodeCollectorDiff(int depth) {
this.depth = depth;
}
public Map<String, RecordId> diff(SegmentNodeState before, ModifiedNodeState after) throws IOException {
after.compareAgainstBaseState(before, this);
if (exception != null) {
throw new IOException(exception);
} else {
return childNodes;
}
}
@Override
public boolean childNodeAdded(String name, NodeState after) {
try {
childNodes.put(name, writeNode(after, depth + 1));
} catch (IOException e) {
exception = e;
return false;
}
return true;
}
@Override
public boolean childNodeChanged(
String name, NodeState before, NodeState after) {
try {
childNodes.put(name, writeNode(after, depth + 1));
} catch (IOException e) {
exception = e;
return false;
}
return true;
}
@Override
public boolean childNodeDeleted(String name, NodeState before) {
childNodes.put(name, null);
return true;
}
}
}
}