blob: 3b3266408094a56307db22fa2fa38a6bfb40f588 [file] [log] [blame]
Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0)
+++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy)
@@ -0,0 +1,278 @@
+package org.apache.lucene.store;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.EnumSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/** An implementation of {@link FSDirectory}
+ * using java.nio.channels.AsynchronousFileChannel. This
+ * class is useful on Windows, where it will take advantage of
+ * overlapped IO, but on other platforms this is likely to
+ * be comparable in performance if not worse than {@link SimpleFSDirectory}.
+ *
+ * This class does not synchronize on the file position
+ * for reads, and does not cause problems when its thread
+ * is interrupted. */
+
+public class AsyncFSDirectory extends FSDirectory {
+
+ private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute<?>[0];
+
+ private final ExecutorService executor;
+
+ /** Create a new AsyncFSDirectory for the named location.
+ *
+ * @param path the path of the directory
+ * @param lockFactory the lock factory to use, or null for the default
+ * ({@link NativeFSLockFactory});
+ * @param executor the executor service for IO notifications
+ * @throws IOException if there is a low-level I/O error
+ */
+ public AsyncFSDirectory(File path, LockFactory lockFactory, ExecutorService executor) throws IOException {
+ super(path, lockFactory);
+ this.executor = executor;
+ }
+
+ /** Create a new AsyncFSDirectory for the named location.
+ *
+ * @param path the path of the directory
+ * @param lockFactory the lock factory to use, or null for the default
+ * ({@link NativeFSLockFactory});
+ * @throws IOException if there is a low-level I/O error
+ */
+ public AsyncFSDirectory(File path, LockFactory lockFactory) throws IOException {
+ this(path, lockFactory, null);
+ }
+
+ /** Create a new AsyncFSDirectory for the named location and {@link NativeFSLockFactory}.
+ *
+ * @param path the path of the directory
+ * @throws IOException if there is a low-level I/O error
+ */
+ public AsyncFSDirectory(File path) throws IOException {
+ this(path, null);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context)
+ throws IOException {
+ ensureOpen();
+
+ ensureCanWrite(name);
+
+ File file = new File(directory, name);
+ return new AsyncFSIndexOutput(this, name, AsynchronousFileChannel.open(file.toPath(),
+ StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE));
+ }
+
+ /** Creates an IndexInput for the file with the given name. */
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ ensureOpen();
+ Path path = new File(directory, name).toPath();
+ AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, EnumSet.of(StandardOpenOption.READ), executor, NO_ATTRIBUTES);
+ return new AsyncFSIndexInput("AsyncFSIndexInput(path=\"" + path + "\")", fc, context, getReadChunkSize());
+ }
+
+ @Override
+ public IndexInputSlicer createSlicer(final String name,
+ final IOContext context) throws IOException {
+ ensureOpen();
+ final Path path = new File(directory, name).toPath();
+ final AsynchronousFileChannel channel = AsynchronousFileChannel.open(
+ path,
+ EnumSet.of(StandardOpenOption.READ),
+ executor,
+ NO_ATTRIBUTES);
+ return new IndexInputSlicer() {
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public IndexInput openSlice(String sliceDescription, long offset, long length) {
+ return new AsyncFSIndexInput("AsyncFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + offset + ":" + (offset+length) + ")",
+ channel, offset, length, BufferedIndexInput.bufferSize(context), getReadChunkSize());
+ }
+ };
+ }
+
+ /**
+ * Writes output with {@link AsynchronousFileChannel#write(ByteBuffer, long)}
+ */
+ protected static class AsyncFSIndexOutput extends FSIndexOutput<AsynchronousFileChannel> {
+ private long fpos = 0;
+
+ public AsyncFSIndexOutput(FSDirectory parent, String name,
+ AsynchronousFileChannel file) throws IOException {
+ super(parent, name, file);
+ }
+
+ /** output methods: */
+ @Override
+ public void flushBuffer(byte[] b, int offset, int size) throws IOException {
+ ensureOpen();
+ if (size == 0) {
+ return; //Avoid creating a ByteBuffer if we don't need to.
+ }
+
+ ByteBuffer bb = ByteBuffer.wrap(b, offset, size);
+ boolean interrupted = false;
+ try {
+ while (bb.remaining() > 0) {
+ try {
+ int written = file.write(bb, fpos).get();
+ fpos += written;
+ } catch (ExecutionException ee) {
+ throw new IOException(ee.getCause());
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public long length() throws IOException {
+ return file.size();
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ file.truncate(length);
+ }
+ }
+
+ protected class AsyncFSIndexInput extends FSIndexInput<AsynchronousFileChannel> {
+
+ private ByteBuffer byteBuf; // wraps the buffer for NIO
+
+ public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, IOContext context, int chunkSize) throws IOException {
+ super(resourceDesc, fc, fc.size(), context, chunkSize);
+ }
+
+ public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, long off, long length, int bufferSize, int chunkSize) {
+ super(resourceDesc, fc, off, length, bufferSize, chunkSize);
+ isClone = true;
+ }
+
+ @Override
+ protected void newBuffer(byte[] newBuffer) {
+ super.newBuffer(newBuffer);
+ byteBuf = ByteBuffer.wrap(newBuffer);
+ }
+
+ @Override
+ protected void readInternal(byte[] b, int offset, int len) throws IOException {
+
+ final ByteBuffer bb;
+
+ // Determine the ByteBuffer we should use
+ if (b == buffer && 0 == offset) {
+ // Use our own pre-wrapped byteBuf:
+ assert byteBuf != null;
+ byteBuf.clear();
+ byteBuf.limit(len);
+ bb = byteBuf;
+ } else {
+ bb = ByteBuffer.wrap(b, offset, len);
+ }
+
+ int readOffset = bb.position();
+ int readLength = bb.limit() - readOffset;
+ assert readLength == len;
+
+ long pos = getFilePointer() + off;
+
+ if (pos + len > end) {
+ throw new EOFException("read past EOF: " + this);
+ }
+
+ try {
+ boolean interrupted = false;
+ try {
+ while (readLength > 0) {
+ final int limit;
+ if (readLength > chunkSize) {
+ // LUCENE-1566 - work around JVM Bug by breaking
+ // very large reads into chunks
+ limit = readOffset + chunkSize;
+ } else {
+ limit = readOffset + readLength;
+ }
+ bb.limit(limit);
+ Future<Integer> future = file.read(bb, pos);
+
+ //We have to read in a loop here since future.get() could
+ //throw InterruptedException.
+ while (true) {
+ try {
+ int i = future.get();
+ if (i < 0) {
+ throw new EOFException("Attempt to read past end of file");
+ }
+ pos += i;
+ readOffset += i;
+ readLength -= i;
+ break;
+ } catch (ExecutionException ee) {
+ throw new IOException(ee.getCause());
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (OutOfMemoryError e) {
+ // propagate OOM up and add a hint for 32bit VM Users hitting the bug
+ // with a large chunk size in the fast path.
+ final OutOfMemoryError outOfMemoryError = new OutOfMemoryError(
+ "OutOfMemoryError likely caused by the Sun VM Bug described in "
+ + "https://issues.apache.org/jira/browse/LUCENE-1566; try calling FSDirectory.setReadChunkSize "
+ + "with a value smaller than the current chunk size (" + chunkSize + ")");
+ outOfMemoryError.initCause(e);
+ throw outOfMemoryError;
+ } catch (IOException ioe) {
+ throw new IOException(ioe.getMessage() + ": " + this, ioe);
+ }
+ }
+
+ @Override
+ protected void seekInternal(long pos) throws IOException {}
+ }
+}
Property changes on: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java
___________________________________________________________________
Added: svn:keywords
## -0,0 +1 ##
+Date Author Id Revision HeadURL
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (revision 1458766)
+++ lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (working copy)
@@ -17,12 +17,17 @@
* limitations under the License.
*/
+import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
import java.util.Collection;
import static java.util.Collections.synchronizedSet;
import java.util.HashSet;
@@ -285,7 +290,7 @@
ensureOpen();
ensureCanWrite(name);
- return new FSIndexOutput(this, name);
+ return new RandomAccessFileFSIndexOutput(this, name);
}
protected void ensureCanWrite(String name) throws IOException {
@@ -298,7 +303,7 @@
throw new IOException("Cannot overwrite: " + file);
}
- protected void onIndexOutputClosed(FSIndexOutput io) {
+ protected void onIndexOutputClosed(FSIndexOutput<?> io) {
staleFiles.add(io.name);
}
@@ -392,10 +397,11 @@
return chunkSize;
}
- /** Base class for reading input from a RandomAccessFile */
- protected abstract static class FSIndexInput extends BufferedIndexInput {
- /** the underlying RandomAccessFile */
- protected final RandomAccessFile file;
+ /** Base class for reading input from an object that accesses a file. This class is parameterized
+ * with the concrete file accessor class (for example RandomAccessFile or FileChannel) */
+ protected abstract static class FSIndexInput<T extends Closeable> extends BufferedIndexInput {
+ /** the underlying file accessor object */
+ protected final T file;
boolean isClone = false;
/** maximum read length on a 32bit JVM to prevent incorrect OOM, see LUCENE-1566 */
protected final int chunkSize;
@@ -405,16 +411,16 @@
protected final long end;
/** Create a new FSIndexInput, reading the entire file from <code>path</code> */
- protected FSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException {
+ protected FSIndexInput(String resourceDesc, T file, long fileLength, IOContext context, int chunkSize) throws IOException {
super(resourceDesc, context);
- this.file = new RandomAccessFile(path, "r");
+ this.file = file;
this.chunkSize = chunkSize;
this.off = 0L;
- this.end = file.length();
+ this.end = fileLength;
}
/** Create a new FSIndexInput, representing a slice of an existing open <code>file</code> */
- protected FSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) {
+ protected FSIndexInput(String resourceDesc, T file, long off, long length, int bufferSize, int chunkSize) {
super(resourceDesc, bufferSize);
this.file = file;
this.chunkSize = chunkSize;
@@ -432,8 +438,9 @@
}
@Override
- public FSIndexInput clone() {
- FSIndexInput clone = (FSIndexInput)super.clone();
+ @SuppressWarnings("unchecked")
+ public FSIndexInput<T> clone() {
+ FSIndexInput<T> clone = (FSIndexInput<T>)super.clone();
clone.isClone = true;
return clone;
}
@@ -442,37 +449,24 @@
public final long length() {
return end - off;
}
-
- /** Method used for testing. Returns true if the underlying
- * file descriptor is valid.
- */
- boolean isFDValid() throws IOException {
- return file.getFD().valid();
- }
}
/**
- * Writes output with {@link RandomAccessFile#write(byte[], int, int)}
+ * Writes output using a file accessor object supplied by a subclass
*/
- protected static class FSIndexOutput extends BufferedIndexOutput {
+ protected abstract static class FSIndexOutput<T extends Closeable> extends BufferedIndexOutput {
+ final String name;
private final FSDirectory parent;
- private final String name;
- private final RandomAccessFile file;
private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once
- public FSIndexOutput(FSDirectory parent, String name) throws IOException {
+ protected final T file;
+
+ public FSIndexOutput(FSDirectory parent, String name, T file) throws IOException {
this.parent = parent;
this.name = name;
- file = new RandomAccessFile(new File(parent.directory, name), "rw");
+ this.file = file;
isOpen = true;
}
-
- /** output methods: */
- @Override
- public void flushBuffer(byte[] b, int offset, int size) throws IOException {
- assert isOpen;
- file.write(b, offset, size);
- }
@Override
public void close() throws IOException {
@@ -497,8 +491,30 @@
}
}
}
+
+ protected void ensureOpen() {
+ assert isOpen;
+ }
+ }
+
+ protected static class RandomAccessFileFSIndexOutput extends FSIndexOutput<RandomAccessFile> {
+ public RandomAccessFileFSIndexOutput(FSDirectory parent, String name) throws IOException {
+ super(parent, name, new RandomAccessFile(new File(parent.directory, name), "rw"));
+ }
+
+ public RandomAccessFileFSIndexOutput(FSDirectory parent, String name, RandomAccessFile file) throws IOException {
+ super(parent, name, file);
+ }
+
@Override
+ protected void flushBuffer(byte[] b, int offset, int len)
+ throws IOException {
+ ensureOpen();
+ file.write(b, offset, len);
+ }
+
+ @Override
public long length() throws IOException {
return file.length();
}
@@ -508,7 +524,40 @@
file.setLength(length);
}
}
+
+ protected static class FileChannelFSIndexOuptut extends FSIndexOutput<FileChannel> {
+ public FileChannelFSIndexOuptut(FSDirectory parent, String name) throws IOException {
+ super(parent, name, FileChannel.open(
+ new File(parent.directory, name).toPath(),
+ StandardOpenOption.READ, StandardOpenOption.WRITE));
+ }
+
+ public FileChannelFSIndexOuptut(FSDirectory parent, String name, FileChannel file) throws IOException {
+ super(parent, name, file);
+ }
+
+ @Override
+ protected void flushBuffer(byte[] b, int offset, int len)
+ throws IOException {
+ ensureOpen();
+ ByteBuffer bb = ByteBuffer.wrap(b, offset, len);
+ while (bb.remaining() > 0) {
+ file.write(bb);
+ }
+ }
+
+ @Override
+ public long length() throws IOException {
+ return file.size();
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ file.truncate(length);
+ }
+ }
+
protected void fsync(String name) throws IOException {
File fullFile = new File(directory, name);
boolean success = false;
Index: lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (revision 1458766)
+++ lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (working copy)
@@ -19,11 +19,11 @@
import java.io.IOException;
import java.io.File;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.StandardOpenOption;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
@@ -133,7 +133,7 @@
this.chunkSizePower = 31 - Integer.numberOfLeadingZeros(maxChunkSize);
assert this.chunkSizePower >= 0 && this.chunkSizePower <= 30;
}
-
+
/**
* <code>true</code>, if this platform supports unmapping mmapped files.
*/
@@ -189,12 +189,9 @@
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
- File f = new File(getDirectory(), name);
- RandomAccessFile raf = new RandomAccessFile(f, "r");
- try {
- return new MMapIndexInput("MMapIndexInput(path=\"" + f + "\")", raf);
- } finally {
- raf.close();
+ File file = new File(getDirectory(), name);
+ try (FileChannel c = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
+ return new MMapIndexInput("MMapIndexInput(path=\"" + file.toString() + "\")", c);
}
}
@@ -218,8 +215,8 @@
private final class MMapIndexInput extends ByteBufferIndexInput {
private final boolean useUnmapHack;
- MMapIndexInput(String resourceDescription, RandomAccessFile raf) throws IOException {
- super(resourceDescription, map(raf, 0, raf.length()), raf.length(), chunkSizePower, getUseUnmap());
+ MMapIndexInput(String resourceDescription, FileChannel fc) throws IOException {
+ super(resourceDescription, map(fc, 0, fc.size()), fc.size(), chunkSizePower, getUseUnmap());
this.useUnmapHack = getUseUnmap();
}
@@ -256,9 +253,9 @@
}
/** Maps a file into a set of buffers */
- ByteBuffer[] map(RandomAccessFile raf, long offset, long length) throws IOException {
+ ByteBuffer[] map(FileChannel fc, long offset, long length) throws IOException {
if ((length >>> chunkSizePower) >= Integer.MAX_VALUE)
- throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + raf.toString());
+ throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + fc.toString());
final long chunkSize = 1L << chunkSizePower;
@@ -268,13 +265,12 @@
ByteBuffer buffers[] = new ByteBuffer[nrBuffers];
long bufferStart = 0L;
- FileChannel rafc = raf.getChannel();
for (int bufNr = 0; bufNr < nrBuffers; bufNr++) {
int bufSize = (int) ( (length > (bufferStart + chunkSize))
? chunkSize
: (length - bufferStart)
);
- buffers[bufNr] = rafc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize);
+ buffers[bufNr] = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize);
bufferStart += bufSize;
}
Index: lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision 1458766)
+++ lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (working copy)
@@ -20,10 +20,10 @@
import java.io.File;
import java.io.EOFException;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future; // javadoc
/**
@@ -77,7 +77,9 @@
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
- return new NIOFSIndexInput(new File(getDirectory(), name), context, getReadChunkSize());
+ File path = new File(getDirectory(), name);
+ FileChannel fc = FileChannel.open(path.toPath(), StandardOpenOption.READ);
+ return new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context, getReadChunkSize());
}
@Override
@@ -85,7 +87,7 @@
final IOContext context) throws IOException {
ensureOpen();
final File path = new File(getDirectory(), name);
- final RandomAccessFile descriptor = new RandomAccessFile(path, "r");
+ final FileChannel descriptor = FileChannel.open(path.toPath(), StandardOpenOption.READ);
return new Directory.IndexInputSlicer() {
@Override
@@ -95,7 +97,7 @@
@Override
public IndexInput openSlice(String sliceDescription, long offset, long length) {
- return new NIOFSIndexInput(sliceDescription, path, descriptor, descriptor.getChannel(), offset,
+ return new NIOFSIndexInput("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + offset + ":" + (offset+length) + ")", descriptor, offset,
length, BufferedIndexInput.bufferSize(context), getReadChunkSize());
}
};
@@ -104,20 +106,16 @@
/**
* Reads bytes with {@link FileChannel#read(ByteBuffer, long)}
*/
- protected static class NIOFSIndexInput extends FSIndexInput {
+ protected static class NIOFSIndexInput extends FSIndexInput<FileChannel> {
private ByteBuffer byteBuf; // wraps the buffer for NIO
- final FileChannel channel;
-
- public NIOFSIndexInput(File path, IOContext context, int chunkSize) throws IOException {
- super("NIOFSIndexInput(path=\"" + path + "\")", path, context, chunkSize);
- channel = file.getChannel();
+ public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context, int chunkSize) throws IOException {
+ super(resourceDesc, fc, fc.size(), context, chunkSize);
}
- public NIOFSIndexInput(String sliceDescription, File path, RandomAccessFile file, FileChannel fc, long off, long length, int bufferSize, int chunkSize) {
- super("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + off + ":" + (off+length) + ")", file, off, length, bufferSize, chunkSize);
- channel = fc;
+ public NIOFSIndexInput(String resourceDesc, FileChannel fc, long off, long length, int bufferSize, int chunkSize) {
+ super(resourceDesc, fc, off, length, bufferSize, chunkSize);
isClone = true;
}
@@ -164,7 +162,7 @@
limit = readOffset + readLength;
}
bb.limit(limit);
- int i = channel.read(bb, pos);
+ int i = file.read(bb, pos);
pos += i;
readOffset += i;
readLength -= i;
@@ -186,5 +184,4 @@
@Override
protected void seekInternal(long pos) throws IOException {}
}
-
}
Index: lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java
===================================================================
--- lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (revision 1458766)
+++ lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (working copy)
@@ -55,7 +55,8 @@
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
final File path = new File(directory, name);
- return new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + path.getPath() + "\")", path, context, getReadChunkSize());
+ RandomAccessFile raf = new RandomAccessFile(path, "r");
+ return new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + path.getPath() + "\")", raf, context, getReadChunkSize());
}
@Override
@@ -83,10 +84,10 @@
* Reads bytes with {@link RandomAccessFile#seek(long)} followed by
* {@link RandomAccessFile#read(byte[], int, int)}.
*/
- protected static class SimpleFSIndexInput extends FSIndexInput {
+ protected static class SimpleFSIndexInput extends FSIndexInput<RandomAccessFile> {
- public SimpleFSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException {
- super(resourceDesc, path, context, chunkSize);
+ public SimpleFSIndexInput(String resourceDesc, RandomAccessFile file, IOContext context, int chunkSize) throws IOException {
+ super(resourceDesc, file, file.length(), context, chunkSize);
}
public SimpleFSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) {
@@ -136,5 +137,12 @@
@Override
protected void seekInternal(long position) {
}
+
+ /** Method used for testing. Returns true if the underlying
+ * file descriptor is valid.
+ */
+ boolean isFDValid() throws IOException {
+ return file.getFD().valid();
+ }
}
}
Index: lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java
===================================================================
--- lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java (revision 1458766)
+++ lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java (working copy)
@@ -21,6 +21,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -93,12 +96,13 @@
writeBytes(tmpInputFile, TEST_FILE_LENGTH);
// run test with chunk size of 10 bytes
- runReadBytesAndClose(new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + tmpInputFile + "\")", tmpInputFile,
- newIOContext(random()), 10), inputBufferSize, random());
+ runReadBytesAndClose(new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + tmpInputFile + "\")",
+ new RandomAccessFile(tmpInputFile, "r"), newIOContext(random()), 10), inputBufferSize, random());
// run test with chunk size of 10 bytes
- runReadBytesAndClose(new NIOFSIndexInput(tmpInputFile,
- newIOContext(random()), 10), inputBufferSize, random());
+ runReadBytesAndClose(new NIOFSIndexInput("NIOFSIndexInput(path=\"" + tmpInputFile + "\")",
+ FileChannel.open(tmpInputFile.toPath(), StandardOpenOption.READ), newIOContext(random()), 10),
+ inputBufferSize, random());
}
private void runReadBytesAndClose(IndexInput input, int bufferSize, Random r)
Index: lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java
===================================================================
--- lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (revision 1458766)
+++ lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (working copy)
@@ -31,12 +31,38 @@
@Override
public boolean reject(Thread t) {
+ // Try to get the stack lazily and reuse it if it's really necessary.
+ StackTraceElement [] stack = null;
+
if (isJ9) {
- StackTraceElement [] stack = t.getStackTrace();
+ stack = t.getStackTrace();
if (stack.length > 0 && stack[stack.length - 1].getClassName().equals("java.util.Timer$TimerImpl")) {
return true; // LUCENE-4736
}
}
+
+ // LUCENE-4848; ignore threads inside async IO. There is no easy way to detect whether
+ // they're part of the system group but we know all these are daemons.
+ if (t.isDaemon()) {
+ if (stack == null) stack = t.getStackTrace();
+ if (stack.length - 2 >= 0) {
+ // Some of the Iocp's stuff is called directly, bypassing the threadpool.
+ if (stack[stack.length - 1].getClassName().equals("java.lang.Thread") &&
+ stack[stack.length - 2].getClassName().startsWith("sun.nio.")) {
+ return true;
+ }
+
+ // The default threadpool's threads will end up somewhere in EventHandlerTask's
+ // run method.
+ for (int i = 0; i < stack.length - 1; i++) {
+ if (stack[i].getClassName().equals("sun.nio.ch.Iocp$EventHandlerTask") &&
+ stack[i].getMethodName().equals("run")) {
+ return true;
+ }
+ }
+ }
+ }
+
return false;
}
}