blob: 3ba1bcb569435579c49e57a40e3745202071c0b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment.file.tar;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static java.lang.String.format;
import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.FILE_NAME_FORMAT;
import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGIC;
import static org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexWriter.newBinaryReferencesIndexWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.zip.CRC32;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexWriter;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.NoopStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A writer for tar files. It is also used to read entries while the file is
* still open.
*/
class TarWriter implements Closeable {
/** Logger instance */
private static final Logger log = LoggerFactory.getLogger(TarWriter.class);
private final int writeIndex;
/**
* Flag to indicate a closed writer. Accessing a closed writer is illegal.
* Should only be accessed from synchronized code.
*/
private boolean closed = false;
/**
* List of binary references contained in this TAR file.
*/
private final BinaryReferencesIndexWriter binaryReferences = newBinaryReferencesIndexWriter();
/**
* Graph of references between segments.
*/
private final Map<UUID, Set<UUID>> graph = newHashMap();
private final SegmentArchiveManager archiveManager;
private final SegmentArchiveWriter archive;
/** This object is used as an additional
* synchronization point by {@link #flush()} and {@link #close()} to
* allow {@link #flush()} to work concurrently with normal reads and
* writes, but not with a concurrent {@link #close()}. */
private final Object closeMonitor = new Object();
/**
* Counter exposing the number of segments.
*/
private final CounterStats segmentCount;
/**
* Used for maintenance operations (GC or recovery) via the TarReader and
* tests
*/
TarWriter(SegmentArchiveManager archiveManager, String archiveName) throws IOException {
this.archiveManager = archiveManager;
this.archive = archiveManager.create(archiveName);
this.writeIndex = -1;
this.segmentCount = NoopStats.INSTANCE;
}
TarWriter(SegmentArchiveManager archiveManager, int writeIndex, CounterStats segmentCountStats)
throws IOException {
this.archiveManager = archiveManager;
this.archive = archiveManager.create(format(FILE_NAME_FORMAT, writeIndex, "a"));
this.writeIndex = writeIndex;
this.segmentCount = segmentCountStats;
}
synchronized boolean containsEntry(long msb, long lsb) {
checkState(!closed);
return archive.containsSegment(msb, lsb);
}
/**
* @return the number of entries currently in this writer
*/
int getEntryCount() {
return archive.getEntryCount();
}
/**
* If the given segment is in this file, get the byte buffer that allows
* reading it.
*
* @param msb the most significant bits of the segment id
* @param lsb the least significant bits of the segment id
* @return the byte buffer, or null if not in this file
*/
Buffer readEntry(long msb, long lsb) throws IOException {
synchronized (this) {
checkState(!closed);
}
return archive.readSegment(msb, lsb);
}
long writeEntry(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException {
checkNotNull(data);
checkPositionIndexes(offset, offset + size, data.length);
synchronized (this) {
checkState(!closed);
archive.writeSegment(msb, lsb, data, offset, size, generation.getGeneration(), generation.getFullGeneration(), generation.isCompacted());
segmentCount.inc();
long currentLength = archive.getLength();
checkState(currentLength <= Integer.MAX_VALUE);
return currentLength;
}
}
void addBinaryReference(GCGeneration generation, UUID segmentId, String reference) {
binaryReferences.addEntry(
generation.getGeneration(),
generation.getFullGeneration(),
generation.isCompacted(),
segmentId,
reference
);
}
void addGraphEdge(UUID from, UUID to) {
graph.computeIfAbsent(from, k -> newHashSet()).add(to);
}
/**
* Flushes the entries that have so far been written to the disk.
* This method is <em>not</em> synchronized to allow concurrent reads
* and writes to proceed while the file is being flushed. However,
* this method <em>is</em> carefully synchronized with {@link #close()}
* to prevent accidental flushing of an already closed file.
*
* @throws IOException if the tar file could not be flushed
*/
void flush() throws IOException {
synchronized (closeMonitor) {
boolean doFlush;
synchronized (this) {
doFlush = archive.isCreated() && !closed;
}
if (doFlush) {
archive.flush();
if (archive.isRemote()) {
writeBinaryReferences();
}
}
}
}
/**
* Closes this tar file.
*
* @throws IOException if the tar file could not be closed
*/
@Override
public void close() throws IOException {
// Mark this writer as closed. Note that we only need to synchronize
// this part, as no other synchronized methods should get invoked
// once close() has been initiated (see related checkState calls).
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
// If nothing was written to this file, then we're already done.
if (!archive.isCreated()) {
return;
}
// Complete the tar file by adding the graph, the index and the
// trailing two zero blocks. This code is synchronized on the closeMonitor
// to ensure that no concurrent thread is still flushing
// the file when we close the file handle.
synchronized (closeMonitor) {
writeBinaryReferences();
writeGraph();
archive.close();
}
}
/**
* If the current instance is dirty, this will return a new TarWriter based
* on the next generation of the file being written to by incrementing the
* internal {@link #writeIndex} counter. Otherwise it will return the
* current instance.
*/
TarWriter createNextGeneration() throws IOException {
checkState(writeIndex >= 0);
// If nothing was written to this file, then we're already done.
synchronized (this) {
if (!archive.isCreated()) {
return this;
}
}
close();
int newIndex = writeIndex + 1;
return new TarWriter(archiveManager, newIndex, segmentCount);
}
private void writeBinaryReferences() throws IOException {
archive.writeBinaryReferences(binaryReferences.write());
}
private void writeGraph() throws IOException {
int graphSize = 0;
// The following information are stored in the footer as meta-
// information about the entry.
// 4 bytes to store a magic number identifying this entry as containing
// references to binary values.
graphSize += 4;
// 4 bytes to store the CRC32 checksum of the data in this entry.
graphSize += 4;
// 4 bytes to store the length of this entry, without including the
// optional padding.
graphSize += 4;
// 4 bytes to store the number of entries in the graph map.
graphSize += 4;
// The following information are stored as part of the main content of
// this entry, after the optional padding.
for (Entry<UUID, Set<UUID>> entry : graph.entrySet()) {
// 16 bytes to store the key of the map.
graphSize += 16;
// 4 bytes for the number of entries in the adjacency list.
graphSize += 4;
// 16 bytes for every element in the adjacency list.
graphSize += 16 * entry.getValue().size();
}
Buffer buffer = Buffer.allocate(graphSize);
for (Entry<UUID, Set<UUID>> entry : graph.entrySet()) {
UUID from = entry.getKey();
buffer.putLong(from.getMostSignificantBits());
buffer.putLong(from.getLeastSignificantBits());
Set<UUID> adj = entry.getValue();
buffer.putInt(adj.size());
for (UUID to : adj) {
buffer.putLong(to.getMostSignificantBits());
buffer.putLong(to.getLeastSignificantBits());
}
}
CRC32 checksum = new CRC32();
checksum.update(buffer.array(), 0, buffer.position());
buffer.putInt((int) checksum.getValue());
buffer.putInt(graph.size());
buffer.putInt(graphSize);
buffer.putInt(GRAPH_MAGIC);
archive.writeGraph(buffer.array());
}
synchronized long fileLength() {
return archive.getLength();
}
synchronized String getFileName() {
return archive.getName();
}
synchronized boolean isClosed() {
return closed;
}
//------------------------------------------------------------< Object >--
@Override
public String toString() {
return getFileName();
}
}