blob: 09da6f1ff5537c681ff1d8203aaf03c7f68ac0a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment.file;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Long.MAX_VALUE;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.apache.jackrabbit.oak.segment.file.FileStoreUtil.findPersistedRecordId;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Revisions;
import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This implementation of {@code Revisions} is backed by a
* {@link TarPersistence#JOURNAL_FILE_NAME journal} file where the current head is persisted
* by calling {@link #tryFlush(Flusher)}.
* <p>
* The {@link #setHead(Function, Option...)} method supports a timeout
* {@link Option}, which can be retrieved through factory methods of this class.
* <p>
* Instance of this class must be {@link #bind(SegmentStore, SegmentIdProvider, Supplier)} bound} to
* a {@code SegmentStore} otherwise its method throw {@code IllegalStateException}s.
*/
public class TarRevisions implements Revisions, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(TarRevisions.class);
/**
* The lock protecting {@link #journalFile}.
*/
private final Lock journalFileLock = new ReentrantLock();
@NotNull
private final AtomicReference<RecordId> head;
private final SegmentNodeStorePersistence persistence;
private final JournalFile journalFile;
/**
* The journal file writer. It is protected by {@link #journalFileLock}. It becomes
* {@code null} after it's closed.
*/
private volatile JournalFileWriter journalFileWriter;
/**
* The persisted head of the root journal, used to determine whether the
* latest {@link #head} value should be written to the disk.
*/
@NotNull
private final AtomicReference<RecordId> persistedHead;
@NotNull
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
private static class TimeOutOption implements Option {
private final long time;
@NotNull
private final TimeUnit unit;
TimeOutOption(long time, @NotNull TimeUnit unit) {
this.time = time;
this.unit = unit;
}
@NotNull
public static TimeOutOption from(@Nullable Option option) {
if (option instanceof TimeOutOption) {
return (TimeOutOption) option;
} else {
throw new IllegalArgumentException("Invalid option " + option);
}
}
}
/**
* Option to cause set head calls to be expedited. That is, cause them to skip the queue
* of any other callers waiting to complete that don't have this option specified.
*/
public static final Option EXPEDITE_OPTION = new Option() {
@Override
public String toString() {
return "Expedite Option";
}
};
/**
* Timeout option approximating no time out ({@code Long.MAX_VALUE} days).
*/
public static final Option INFINITY = new TimeOutOption(MAX_VALUE, DAYS);
/**
* Factory method for creating a timeout option.
*/
public static Option timeout(long time, TimeUnit unit) {
return new TimeOutOption(time, unit);
}
/**
* Create a new instance placing the journal log file into the passed
* {@code directory}.
* @param persistence object representing the segment persistence
* @throws IOException
*/
public TarRevisions(SegmentNodeStorePersistence persistence) throws IOException {
this.journalFile = persistence.getJournalFile();
this.journalFileWriter = journalFile.openJournalWriter();
this.head = new AtomicReference<>(null);
this.persistedHead = new AtomicReference<>(null);
this.persistence = persistence;
}
/**
* Bind this instance to a store.
* @param store store to bind to
* @param idProvider {@code SegmentIdProvider} of the {@code store}
* @param writeInitialNode provider for the initial node in case the journal is empty.
* @throws IOException
*/
synchronized void bind(@NotNull SegmentStore store,
@NotNull SegmentIdProvider idProvider,
@NotNull Supplier<RecordId> writeInitialNode)
throws IOException {
if (head.get() != null) {
return;
}
RecordId persistedId = findPersistedRecordId(store, idProvider, journalFile);
if (persistedId == null) {
head.set(writeInitialNode.get());
} else {
persistedHead.set(persistedId);
head.set(persistedId);
}
}
private void checkBound() {
checkState(head.get() != null, "Revisions not bound to a store");
}
/**
* Flush the id of the current head to the journal after a call to {@code
* persisted}. Differently from {@link #tryFlush(Flusher)}, this method
* does not return early if a concurrent call is in progress. Instead, it
* blocks the caller until the requested flush operation is performed.
*
* @param flusher call back for upstream dependencies to ensure the current
* head state is actually persisted before its id is written
* to the head state.
*/
void flush(Flusher flusher) throws IOException {
if (head.get() == null) {
LOG.debug("No head available, skipping flush");
return;
}
journalFileLock.lock();
try {
doFlush(flusher);
} finally {
journalFileLock.unlock();
}
}
/**
* Flush the id of the current head to the journal after a call to {@code
* persisted}. This method does nothing and returns immediately if called
* concurrently and a call is already in progress.
*
* @param flusher call back for upstream dependencies to ensure the current
* head state is actually persisted before its id is written
* to the head state.
*/
void tryFlush(Flusher flusher) throws IOException {
if (head.get() == null) {
LOG.debug("No head available, skipping flush");
return;
}
if (journalFileLock.tryLock()) {
try {
doFlush(flusher);
} finally {
journalFileLock.unlock();
}
} else {
LOG.debug("Unable to lock the journal, skipping flush");
}
}
private void doFlush(Flusher flusher) throws IOException {
if (journalFileWriter == null) {
LOG.debug("No journal file available, skipping flush");
return;
}
RecordId before = persistedHead.get();
RecordId after = getHead();
if (after.equals(before)) {
LOG.debug("Head state did not change, skipping flush");
return;
}
flusher.flush();
LOG.debug("TarMK journal update {} -> {}", before, after);
journalFileWriter.writeLine(after.toString10() + " root " + System.currentTimeMillis());
persistedHead.set(after);
}
@NotNull
@Override
public RecordId getHead() {
checkBound();
return head.get();
}
@NotNull
@Override
public RecordId getPersistedHead() {
checkBound();
return persistedHead.get();
}
/**
* This implementation blocks if a concurrent call to
* {@link #setHead(Function, Option...)} is already in
* progress.
*
* @param options zero or one expedite option for expediting this call
* @throws IllegalArgumentException on any non recognised {@code option}.
* @see #EXPEDITE_OPTION
*/
@Override
public boolean setHead(
@NotNull RecordId expected,
@NotNull RecordId head,
@NotNull Option... options) {
checkBound();
// If the expedite option was specified we acquire the write lock instead of the read lock.
// This will cause this thread to get the lock before all threads currently waiting to
// enter the read lock. See also the class comment of ReadWriteLock.
Lock lock = isExpedited(options)
? rwLock.writeLock()
: rwLock.readLock();
lock.lock();
try {
RecordId id = this.head.get();
return id.equals(expected) && this.head.compareAndSet(id, head);
} finally {
lock.unlock();
}
}
/**
* This implementation blocks if a concurrent call is already in progress.
* @param newHead function mapping an record id to the record id to which
* the current head id should be set. If it returns
* {@code null} the head remains unchanged and {@code setHead}
* returns {@code false}.
* @param options zero or one timeout options specifying how long to block
* @throws InterruptedException
* @throws IllegalArgumentException on any non recognised {@code option}.
* @see #timeout(long, TimeUnit)
* @see #INFINITY
*/
@Override
public RecordId setHead(
@NotNull Function<RecordId, RecordId> newHead,
@NotNull Option... options)
throws InterruptedException {
checkBound();
TimeOutOption timeout = getTimeout(options);
if (rwLock.writeLock().tryLock(timeout.time, timeout.unit)) {
try {
RecordId after = newHead.apply(getHead());
if (after != null) {
head.set(after);
return after;
} else {
return null;
}
} finally {
rwLock.writeLock().unlock();
}
} else {
return null;
}
}
private static boolean isExpedited(Option[] options) {
if (options.length == 0) {
return false;
} else if (options.length == 1) {
return options[0] == EXPEDITE_OPTION;
} else {
throw new IllegalArgumentException("Expected zero or one options, got " + options.length);
}
}
@NotNull
private static TimeOutOption getTimeout(@NotNull Option[] options) {
if (options.length == 0) {
return TimeOutOption.from(INFINITY);
} else if (options.length == 1) {
return TimeOutOption.from(options[0]);
} else {
throw new IllegalArgumentException("Expected zero or one options, got " + options.length);
}
}
/**
* Close the underlying journal file.
* @throws IOException
*/
@Override
public void close() throws IOException {
journalFileLock.lock();
try {
if (journalFileWriter == null) {
return;
}
journalFileWriter.close();
journalFileWriter = null;
} finally {
journalFileLock.unlock();
}
}
}