blob: 5b31938e95d7f21b59b5048e9e1011ac052b4b56 [file] [log] [blame]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.apache.couchdb.nouveau.core;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
import org.apache.couchdb.nouveau.api.IndexInfo;
import org.apache.couchdb.nouveau.api.SearchRequest;
import org.apache.couchdb.nouveau.api.SearchResults;
/**
* An index that reflects a single `.couch` file shard of some
* database.
*
* The class only permits sequential modification (updates and deletes)
* but allows concurrent searching.
*
* This class also expects a monotonically incrementing update sequence
* associated with each modification.
*/
public abstract class Index implements Closeable {
private long updateSeq;
private boolean deleteOnClose = false;
private long lastCommit = now();
private volatile boolean closed;
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
protected Index(final long updateSeq) {
this.updateSeq = updateSeq;
}
public final boolean tryAcquire() {
if (permits.tryAcquire() == false) {
return false;
}
if (closed) {
permits.release();
return false;
}
return true;
}
public final boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
if (permits.tryAcquire(timeout, unit) == false) {
return false;
}
if (closed) {
permits.release();
return false;
}
return true;
}
public final void release() {
permits.release();
}
public final IndexInfo info() throws IOException {
final int numDocs = doNumDocs();
final long diskSize = doDiskSize();
return new IndexInfo(updateSeq, numDocs, diskSize);
}
protected abstract int doNumDocs() throws IOException;
protected abstract long doDiskSize() throws IOException;
public final synchronized void update(final String docId, final DocumentUpdateRequest request) throws IOException {
assertUpdateSeqIsLower(request.getSeq());
doUpdate(docId, request);
incrementUpdateSeq(request.getSeq());
}
protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException;
public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
assertUpdateSeqIsLower(request.getSeq());
doDelete(docId, request);
incrementUpdateSeq(request.getSeq());
}
protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;
public final SearchResults search(final SearchRequest request) throws IOException {
return doSearch(request);
}
protected abstract SearchResults doSearch(final SearchRequest request) throws IOException;
public final boolean commit() throws IOException {
final long updateSeq;
synchronized (this) {
updateSeq = this.updateSeq;
}
final boolean result = doCommit(updateSeq);
if (result) {
final long now = now();
synchronized (this) {
this.lastCommit = now;
}
}
return result;
}
protected abstract boolean doCommit(final long updateSeq) throws IOException;
@Override
public final void close() throws IOException {
synchronized (this) {
closed = true;
}
// Ensures exclusive access to the index before closing.
permits.acquireUninterruptibly(Integer.MAX_VALUE);
try {
doClose();
} finally {
permits.release(Integer.MAX_VALUE);
}
}
protected abstract void doClose() throws IOException;
public boolean isDeleteOnClose() {
return deleteOnClose;
}
public void setDeleteOnClose(final boolean deleteOnClose) {
synchronized (this) {
this.deleteOnClose = deleteOnClose;
}
}
protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException {
assert Thread.holdsLock(this);
if (!(updateSeq > this.updateSeq)) {
throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq);
}
}
protected final void incrementUpdateSeq(final long updateSeq) throws IOException {
assert Thread.holdsLock(this);
assertUpdateSeqIsLower(updateSeq);
this.updateSeq = updateSeq;
}
public boolean needsCommit(final long duration, final TimeUnit unit) {
final long commitNeededSince = now() - unit.toNanos(duration);
synchronized (this) {
return this.lastCommit < commitNeededSince;
}
}
private long now() {
return System.nanoTime();
}
}