blob: 44fe4182a4e1b0f36db28298baecd75936b22194 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.util.SuppressForbidden;
// TODO
// - newer Linux kernel versions (after 2.6.29) have
// improved MADV_SEQUENTIAL (and hopefully also
// FADV_SEQUENTIAL) interaction with the buffer
// cache; we should explore using that instead of direct
// IO when context is merge
/**
* A {@link Directory} implementation for all Unixes that uses
* DIRECT I/O to bypass OS level IO caching during
* merging. For all other cases (searching, writing) we delegate
* to the provided Directory instance.
*
* <p>See <a
* href="{@docRoot}/overview-summary.html#NativeUnixDirectory">Overview</a>
* for more details.
*
* <p>To use this you must compile
* NativePosixUtil.cpp (exposes Linux-specific APIs through
* JNI) for your platform, by running <code>ant
* build-native-unix</code>, and then putting the resulting
* <code>libNativePosixUtil.so</code> (from
* <code>lucene/build/native</code>) onto your dynamic
* linker search path.
*
* <p><b>WARNING</b>: this code is very new and quite easily
* could contain horrible bugs. For example, here's one
* known issue: if you use seek in <code>IndexOutput</code>, and then
* write more than one buffer's worth of bytes, then the
* file will be wrong. Lucene does not do this today (only writes
* small number of bytes after seek), but that may change.
*
* <p>This directory passes Solr and Lucene tests on Linux
* and OS X; other Unixes should work but have not been
* tested! Use at your own risk.
*
* @lucene.experimental
*/
public class NativeUnixDirectory extends FSDirectory {
// TODO: this is OS dependent, but likely 512 is the LCD
private final static long ALIGN = 512;
private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
/** Default buffer size before writing to disk (256 KB);
* larger means less IO load but more RAM and direct
* buffer storage space consumed during merging. */
public final static int DEFAULT_MERGE_BUFFER_SIZE = 262144;
/** Default min expected merge size before direct IO is
* used (10 MB): */
public final static long DEFAULT_MIN_BYTES_DIRECT = 10*1024*1024;
private final int mergeBufferSize;
private final long minBytesDirect;
private final Directory delegate;
/** Create a new NIOFSDirectory for the named location.
*
* @param path the path of the directory
* @param lockFactory to use
* @param mergeBufferSize Size of buffer to use for
* merging. See {@link #DEFAULT_MERGE_BUFFER_SIZE}.
* @param minBytesDirect Merges, or files to be opened for
* reading, smaller than this will
* not use direct IO. See {@link
* #DEFAULT_MIN_BYTES_DIRECT}
* @param delegate fallback Directory for non-merges
* @throws IOException If there is a low-level I/O error
*/
public NativeUnixDirectory(Path path, int mergeBufferSize, long minBytesDirect, LockFactory lockFactory, Directory delegate) throws IOException {
super(path, lockFactory);
if ((mergeBufferSize & ALIGN) != 0) {
throw new IllegalArgumentException("mergeBufferSize must be 0 mod " + ALIGN + " (got: " + mergeBufferSize + ")");
}
this.mergeBufferSize = mergeBufferSize;
this.minBytesDirect = minBytesDirect;
this.delegate = delegate;
}
/** Create a new NIOFSDirectory for the named location.
*
* @param path the path of the directory
* @param lockFactory the lock factory to use
* @param delegate fallback Directory for non-merges
* @throws IOException If there is a low-level I/O error
*/
public NativeUnixDirectory(Path path, LockFactory lockFactory, Directory delegate) throws IOException {
this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, lockFactory, delegate);
}
/** Create a new NIOFSDirectory for the named location with {@link FSLockFactory#getDefault()}.
*
* @param path the path of the directory
* @param delegate fallback Directory for non-merges
* @throws IOException If there is a low-level I/O error
*/
public NativeUnixDirectory(Path path, Directory delegate) throws IOException {
this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, FSLockFactory.getDefault(), delegate);
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect || fileLength(name) < minBytesDirect) {
return delegate.openInput(name, context);
} else {
return new NativeUnixIndexInput(getDirectory().resolve(name), mergeBufferSize);
}
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect) {
return delegate.createOutput(name, context);
} else {
return new NativeUnixIndexOutput(getDirectory().resolve(name), name, mergeBufferSize);
}
}
@SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor")
private final static class NativeUnixIndexOutput extends IndexOutput {
private final ByteBuffer buffer;
private final FileOutputStream fos;
private final FileChannel channel;
private final int bufferSize;
//private final File path;
private int bufferPos;
private long filePos;
private long fileLength;
private boolean isOpen;
public NativeUnixIndexOutput(Path path, String name, int bufferSize) throws IOException {
super("NativeUnixIndexOutput(path=\"" + path.toString() + "\")", name);
//this.path = path;
final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false);
fos = new FileOutputStream(fd);
//fos = new FileOutputStream(path);
channel = fos.getChannel();
buffer = ByteBuffer.allocateDirect(bufferSize);
this.bufferSize = bufferSize;
isOpen = true;
}
@Override
public void writeByte(byte b) throws IOException {
assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
buffer.put(b);
if (++bufferPos == bufferSize) {
dump();
}
}
@Override
public void writeBytes(byte[] src, int offset, int len) throws IOException {
int toWrite = len;
while(true) {
final int left = bufferSize - bufferPos;
if (left <= toWrite) {
buffer.put(src, offset, left);
toWrite -= left;
offset += left;
bufferPos = bufferSize;
dump();
} else {
buffer.put(src, offset, toWrite);
bufferPos += toWrite;
break;
}
}
}
//@Override
//public void setLength() throws IOException {
// TODO -- how to impl this? neither FOS nor
// FileChannel provides an API?
//}
private void dump() throws IOException {
buffer.flip();
final long limit = filePos + buffer.limit();
if (limit > fileLength) {
// this dump extends the file
fileLength = limit;
} else {
// we had seek'd back & wrote some changes
}
// must always round to next block
buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK));
assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK);
assert (filePos & ALIGN_NOT_MASK) == filePos;
//System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos);
channel.write(buffer, filePos);
filePos += bufferPos;
bufferPos = 0;
buffer.clear();
//System.out.println("dump: done");
// TODO: the case where we'd seek'd back, wrote an
// entire buffer, we must here read the next buffer;
// likely Lucene won't trip on this since we only
// write smallish amounts on seeking back
}
@Override
public long getFilePointer() {
return filePos + bufferPos;
}
@Override
public long getChecksum() throws IOException {
throw new UnsupportedOperationException("this directory currently does not work at all!");
}
@Override
public void close() throws IOException {
if (isOpen) {
isOpen = false;
try {
dump();
} finally {
try {
//System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path);
channel.truncate(fileLength);
//System.out.println(" now: " + channel.size());
} finally {
try {
channel.close();
} finally {
fos.close();
//System.out.println(" final len=" + path.length());
}
}
}
}
}
}
@SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor")
private final static class NativeUnixIndexInput extends IndexInput {
private final ByteBuffer buffer;
private final FileInputStream fis;
private final FileChannel channel;
private final int bufferSize;
private boolean isOpen;
private boolean isClone;
private long filePos;
private int bufferPos;
public NativeUnixIndexInput(Path path, int bufferSize) throws IOException {
super("NativeUnixIndexInput(path=\"" + path + "\")");
final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true);
fis = new FileInputStream(fd);
channel = fis.getChannel();
this.bufferSize = bufferSize;
buffer = ByteBuffer.allocateDirect(bufferSize);
isOpen = true;
isClone = false;
filePos = -bufferSize;
bufferPos = bufferSize;
//System.out.println("D open " + path + " this=" + this);
}
// for clone
public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException {
super(other.toString());
this.fis = null;
channel = other.channel;
this.bufferSize = other.bufferSize;
buffer = ByteBuffer.allocateDirect(bufferSize);
filePos = -bufferSize;
bufferPos = bufferSize;
isOpen = true;
isClone = true;
//System.out.println("D clone this=" + this);
seek(other.getFilePointer());
}
@Override
public void close() throws IOException {
if (isOpen && !isClone) {
try {
channel.close();
} finally {
if (!isClone) {
fis.close();
}
}
}
}
@Override
public long getFilePointer() {
return filePos + bufferPos;
}
@Override
public void seek(long pos) throws IOException {
if (pos != getFilePointer()) {
final long alignedPos = pos & ALIGN_NOT_MASK;
filePos = alignedPos-bufferSize;
final int delta = (int) (pos - alignedPos);
if (delta != 0) {
refill();
buffer.position(delta);
bufferPos = delta;
} else {
// force refill on next read
bufferPos = bufferSize;
}
}
}
@Override
public long length() {
try {
return channel.size();
} catch (IOException ioe) {
throw new RuntimeException("IOException during length(): " + this, ioe);
}
}
@Override
public byte readByte() throws IOException {
// NOTE: we don't guard against EOF here... ie the
// "final" buffer will typically be filled to less
// than bufferSize
if (bufferPos == bufferSize) {
refill();
}
assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position();
bufferPos++;
return buffer.get();
}
private void refill() throws IOException {
buffer.clear();
filePos += bufferSize;
bufferPos = 0;
assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK);
//System.out.println("X refill filePos=" + filePos);
int n;
try {
n = channel.read(buffer, filePos);
} catch (IOException ioe) {
throw new IOException(ioe.getMessage() + ": " + this, ioe);
}
if (n < 0) {
throw new EOFException("read past EOF: " + this);
}
buffer.rewind();
}
@Override
public void readBytes(byte[] dst, int offset, int len) throws IOException {
int toRead = len;
//System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this);
while(true) {
final int left = bufferSize - bufferPos;
if (left < toRead) {
//System.out.println(" copy " + left);
buffer.get(dst, offset, left);
toRead -= left;
offset += left;
refill();
} else {
//System.out.println(" copy " + toRead);
buffer.get(dst, offset, toRead);
bufferPos += toRead;
//System.out.println(" readBytes done");
break;
}
}
}
@Override
public NativeUnixIndexInput clone() {
try {
return new NativeUnixIndexInput(this);
} catch (IOException ioe) {
throw new RuntimeException("IOException during clone: " + this, ioe);
}
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
// TODO: is this the right thing to do?
return BufferedIndexInput.wrap(sliceDescription, this, offset, length);
}
}
}