| Index: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (revision 0)
|
| +++ lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java (working copy)
|
| @@ -0,0 +1,278 @@
|
| +package org.apache.lucene.store; |
| + |
| +/* |
| + * Licensed to the Apache Software Foundation (ASF) under one or more |
| + * contributor license agreements. See the NOTICE file distributed with |
| + * this work for additional information regarding copyright ownership. |
| + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| + * (the "License"); you may not use this file except in compliance with |
| + * the License. You may obtain a copy of the License at |
| + * |
| + * http://www.apache.org/licenses/LICENSE-2.0 |
| + * |
| + * Unless required by applicable law or agreed to in writing, software |
| + * distributed under the License is distributed on an "AS IS" BASIS, |
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| + * See the License for the specific language governing permissions and |
| + * limitations under the License. |
| + */ |
| + |
| +import java.io.EOFException; |
| +import java.io.File; |
| +import java.io.IOException; |
| +import java.nio.ByteBuffer; |
| +import java.nio.channels.AsynchronousFileChannel; |
| +import java.nio.file.Path; |
| +import java.nio.file.StandardOpenOption; |
| +import java.nio.file.attribute.FileAttribute; |
| +import java.util.EnumSet; |
| +import java.util.concurrent.ExecutionException; |
| +import java.util.concurrent.ExecutorService; |
| +import java.util.concurrent.Future; |
| + |
| +/** An implementation of {@link FSDirectory} |
| + * using java.nio.channels.AsynchronousFileChannel. This |
| + * class is useful on Windows, where it will take advantage of |
| + * overlapped IO, but on other platforms this is likely to |
| + * be comparable in performance if not worse than {@link SimpleFSDirectory}. |
| + * |
| + * This class does not synchronize on the file position |
| + * for reads, and does not cause problems when its thread |
| + * is interrupted. */ |
| + |
| +public class AsyncFSDirectory extends FSDirectory { |
| + |
| + private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute<?>[0]; |
| + |
| + private final ExecutorService executor; |
| + |
| + /** Create a new AsyncFSDirectory for the named location. |
| + * |
| + * @param path the path of the directory |
| + * @param lockFactory the lock factory to use, or null for the default |
| + * ({@link NativeFSLockFactory}); |
| + * @param executor the executor service for IO notifications |
| + * @throws IOException if there is a low-level I/O error |
| + */ |
| + public AsyncFSDirectory(File path, LockFactory lockFactory, ExecutorService executor) throws IOException { |
| + super(path, lockFactory); |
| + this.executor = executor; |
| + } |
| + |
| + /** Create a new AsyncFSDirectory for the named location. |
| + * |
| + * @param path the path of the directory |
| + * @param lockFactory the lock factory to use, or null for the default |
| + * ({@link NativeFSLockFactory}); |
| + * @throws IOException if there is a low-level I/O error |
| + */ |
| + public AsyncFSDirectory(File path, LockFactory lockFactory) throws IOException { |
| + this(path, lockFactory, null); |
| + } |
| + |
| + /** Create a new AsyncFSDirectory for the named location and {@link NativeFSLockFactory}. |
| + * |
| + * @param path the path of the directory |
| + * @throws IOException if there is a low-level I/O error |
| + */ |
| + public AsyncFSDirectory(File path) throws IOException { |
| + this(path, null); |
| + } |
| + |
| + @Override |
| + public IndexOutput createOutput(String name, IOContext context) |
| + throws IOException { |
| + ensureOpen(); |
| + |
| + ensureCanWrite(name); |
| + |
| + File file = new File(directory, name); |
| + return new AsyncFSIndexOutput(this, name, AsynchronousFileChannel.open(file.toPath(), |
| + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE)); |
| + } |
| + |
| + /** Creates an IndexInput for the file with the given name. */ |
| + @Override |
| + public IndexInput openInput(String name, IOContext context) throws IOException { |
| + ensureOpen(); |
| + Path path = new File(directory, name).toPath(); |
| + AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, EnumSet.of(StandardOpenOption.READ), executor, NO_ATTRIBUTES); |
| + return new AsyncFSIndexInput("AsyncFSIndexInput(path=\"" + path + "\")", fc, context, getReadChunkSize()); |
| + } |
| + |
| + @Override |
| + public IndexInputSlicer createSlicer(final String name, |
| + final IOContext context) throws IOException { |
| + ensureOpen(); |
| + final Path path = new File(directory, name).toPath(); |
| + final AsynchronousFileChannel channel = AsynchronousFileChannel.open( |
| + path, |
| + EnumSet.of(StandardOpenOption.READ), |
| + executor, |
| + NO_ATTRIBUTES); |
| + return new IndexInputSlicer() { |
| + |
| + @Override |
| + public void close() throws IOException { |
| + channel.close(); |
| + } |
| + |
| + @Override |
| + public IndexInput openSlice(String sliceDescription, long offset, long length) { |
| + return new AsyncFSIndexInput("AsyncFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + offset + ":" + (offset+length) + ")", |
| + channel, offset, length, BufferedIndexInput.bufferSize(context), getReadChunkSize()); |
| + } |
| + }; |
| + } |
| + |
| + /** |
| + * Writes output with {@link AsynchronousFileChannel#write(ByteBuffer, long)} |
| + */ |
| + protected static class AsyncFSIndexOutput extends FSIndexOutput<AsynchronousFileChannel> { |
| + private long fpos = 0; |
| + |
| + public AsyncFSIndexOutput(FSDirectory parent, String name, |
| + AsynchronousFileChannel file) throws IOException { |
| + super(parent, name, file); |
| + } |
| + |
| + /** output methods: */ |
| + @Override |
| + public void flushBuffer(byte[] b, int offset, int size) throws IOException { |
| + ensureOpen(); |
| + if (size == 0) { |
| + return; //Avoid creating a ByteBuffer if we don't need to. |
| + } |
| + |
| + ByteBuffer bb = ByteBuffer.wrap(b, offset, size); |
| + boolean interrupted = false; |
| + try { |
| + while (bb.remaining() > 0) { |
| + try { |
| + int written = file.write(bb, fpos).get(); |
| + fpos += written; |
| + } catch (ExecutionException ee) { |
| + throw new IOException(ee.getCause()); |
| + } catch (InterruptedException ie) { |
| + interrupted = true; |
| + } |
| + } |
| + } finally { |
| + if (interrupted) |
| + Thread.currentThread().interrupt(); |
| + } |
| + } |
| + |
| + @Override |
| + public long length() throws IOException { |
| + return file.size(); |
| + } |
| + |
| + @Override |
| + public void setLength(long length) throws IOException { |
| + file.truncate(length); |
| + } |
| + } |
| + |
| + protected class AsyncFSIndexInput extends FSIndexInput<AsynchronousFileChannel> { |
| + |
| + private ByteBuffer byteBuf; // wraps the buffer for NIO |
| + |
| + public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, IOContext context, int chunkSize) throws IOException { |
| + super(resourceDesc, fc, fc.size(), context, chunkSize); |
| + } |
| + |
| + public AsyncFSIndexInput(String resourceDesc, AsynchronousFileChannel fc, long off, long length, int bufferSize, int chunkSize) { |
| + super(resourceDesc, fc, off, length, bufferSize, chunkSize); |
| + isClone = true; |
| + } |
| + |
| + @Override |
| + protected void newBuffer(byte[] newBuffer) { |
| + super.newBuffer(newBuffer); |
| + byteBuf = ByteBuffer.wrap(newBuffer); |
| + } |
| + |
| + @Override |
| + protected void readInternal(byte[] b, int offset, int len) throws IOException { |
| + |
| + final ByteBuffer bb; |
| + |
| + // Determine the ByteBuffer we should use |
| + if (b == buffer && 0 == offset) { |
| + // Use our own pre-wrapped byteBuf: |
| + assert byteBuf != null; |
| + byteBuf.clear(); |
| + byteBuf.limit(len); |
| + bb = byteBuf; |
| + } else { |
| + bb = ByteBuffer.wrap(b, offset, len); |
| + } |
| + |
| + int readOffset = bb.position(); |
| + int readLength = bb.limit() - readOffset; |
| + assert readLength == len; |
| + |
| + long pos = getFilePointer() + off; |
| + |
| + if (pos + len > end) { |
| + throw new EOFException("read past EOF: " + this); |
| + } |
| + |
| + try { |
| + boolean interrupted = false; |
| + try { |
| + while (readLength > 0) { |
| + final int limit; |
| + if (readLength > chunkSize) { |
| + // LUCENE-1566 - work around JVM Bug by breaking |
| + // very large reads into chunks |
| + limit = readOffset + chunkSize; |
| + } else { |
| + limit = readOffset + readLength; |
| + } |
| + bb.limit(limit); |
| + Future<Integer> future = file.read(bb, pos); |
| + |
| + //We have to read in a loop here since future.get() could |
| + //throw InterruptedException. |
| + while (true) { |
| + try { |
| + int i = future.get(); |
| + if (i < 0) { |
| + throw new EOFException("Attempt to read past end of file"); |
| + } |
| + pos += i; |
| + readOffset += i; |
| + readLength -= i; |
| + break; |
| + } catch (ExecutionException ee) { |
| + throw new IOException(ee.getCause()); |
| + } catch (InterruptedException ie) { |
| + interrupted = true; |
| + } |
| + } |
| + } |
| + } finally { |
| + if (interrupted) { |
| + Thread.currentThread().interrupt(); |
| + } |
| + } |
| + } catch (OutOfMemoryError e) { |
| + // propagate OOM up and add a hint for 32bit VM Users hitting the bug |
| + // with a large chunk size in the fast path. |
| + final OutOfMemoryError outOfMemoryError = new OutOfMemoryError( |
| + "OutOfMemoryError likely caused by the Sun VM Bug described in " |
| + + "https://issues.apache.org/jira/browse/LUCENE-1566; try calling FSDirectory.setReadChunkSize " |
| + + "with a value smaller than the current chunk size (" + chunkSize + ")"); |
| + outOfMemoryError.initCause(e); |
| + throw outOfMemoryError; |
| + } catch (IOException ioe) { |
| + throw new IOException(ioe.getMessage() + ": " + this, ioe); |
| + } |
| + } |
| + |
| + @Override |
| + protected void seekInternal(long pos) throws IOException {} |
| + } |
| +} |
|
|
| Property changes on: lucene/core/src/java/org/apache/lucene/store/AsyncFSDirectory.java
|
| ___________________________________________________________________
|
| Added: svn:keywords
|
| ## -0,0 +1 ##
|
| +Date Author Id Revision HeadURL
|
| \ No newline at end of property
|
| Added: svn:eol-style
|
| ## -0,0 +1 ##
|
| +native
|
| \ No newline at end of property
|
| Index: lucene/core/src/java/org/apache/lucene/store/FSDirectory.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (revision 1458766)
|
| +++ lucene/core/src/java/org/apache/lucene/store/FSDirectory.java (working copy)
|
| @@ -17,12 +17,17 @@
|
| * limitations under the License. |
| */ |
| |
| +import java.io.Closeable; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| |
| +import java.nio.ByteBuffer; |
| +import java.nio.channels.FileChannel; |
| +import java.nio.file.StandardOpenOption; |
| + |
| import java.util.Collection; |
| import static java.util.Collections.synchronizedSet; |
| import java.util.HashSet; |
| @@ -285,7 +290,7 @@
|
| ensureOpen(); |
| |
| ensureCanWrite(name); |
| - return new FSIndexOutput(this, name); |
| + return new RandomAccessFileFSIndexOutput(this, name); |
| } |
| |
| protected void ensureCanWrite(String name) throws IOException { |
| @@ -298,7 +303,7 @@
|
| throw new IOException("Cannot overwrite: " + file); |
| } |
| |
| - protected void onIndexOutputClosed(FSIndexOutput io) { |
| + protected void onIndexOutputClosed(FSIndexOutput<?> io) { |
| staleFiles.add(io.name); |
| } |
| |
| @@ -392,10 +397,11 @@
|
| return chunkSize; |
| } |
| |
| - /** Base class for reading input from a RandomAccessFile */ |
| - protected abstract static class FSIndexInput extends BufferedIndexInput { |
| - /** the underlying RandomAccessFile */ |
| - protected final RandomAccessFile file; |
| + /** Base class for reading input from an object that accesses a file. This class is parameterized |
| + * with the concrete file accessor class (for example RandomAccessFile or FileChannel) */ |
| + protected abstract static class FSIndexInput<T extends Closeable> extends BufferedIndexInput { |
| + /** the underlying file accessor object */ |
| + protected final T file; |
| boolean isClone = false; |
| /** maximum read length on a 32bit JVM to prevent incorrect OOM, see LUCENE-1566 */ |
| protected final int chunkSize; |
| @@ -405,16 +411,16 @@
|
| protected final long end; |
| |
| /** Create a new FSIndexInput, reading the entire file from <code>path</code> */ |
| - protected FSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException { |
| + protected FSIndexInput(String resourceDesc, T file, long fileLength, IOContext context, int chunkSize) throws IOException { |
| super(resourceDesc, context); |
| - this.file = new RandomAccessFile(path, "r"); |
| + this.file = file; |
| this.chunkSize = chunkSize; |
| this.off = 0L; |
| - this.end = file.length(); |
| + this.end = fileLength; |
| } |
| |
| /** Create a new FSIndexInput, representing a slice of an existing open <code>file</code> */ |
| - protected FSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) { |
| + protected FSIndexInput(String resourceDesc, T file, long off, long length, int bufferSize, int chunkSize) { |
| super(resourceDesc, bufferSize); |
| this.file = file; |
| this.chunkSize = chunkSize; |
| @@ -432,8 +438,9 @@
|
| } |
| |
| @Override |
| - public FSIndexInput clone() { |
| - FSIndexInput clone = (FSIndexInput)super.clone(); |
| + @SuppressWarnings("unchecked") |
| + public FSIndexInput<T> clone() { |
| + FSIndexInput<T> clone = (FSIndexInput<T>)super.clone(); |
| clone.isClone = true; |
| return clone; |
| } |
| @@ -442,37 +449,24 @@
|
| public final long length() { |
| return end - off; |
| } |
| - |
| - /** Method used for testing. Returns true if the underlying |
| - * file descriptor is valid. |
| - */ |
| - boolean isFDValid() throws IOException { |
| - return file.getFD().valid(); |
| - } |
| } |
| |
| /** |
| - * Writes output with {@link RandomAccessFile#write(byte[], int, int)} |
| + * Writes output using a file accessor object supplied by a subclass |
| */ |
| - protected static class FSIndexOutput extends BufferedIndexOutput { |
| + protected abstract static class FSIndexOutput<T extends Closeable> extends BufferedIndexOutput { |
| + final String name; |
| private final FSDirectory parent; |
| - private final String name; |
| - private final RandomAccessFile file; |
| private volatile boolean isOpen; // remember if the file is open, so that we don't try to close it more than once |
| |
| - public FSIndexOutput(FSDirectory parent, String name) throws IOException { |
| + protected final T file; |
| + |
| + public FSIndexOutput(FSDirectory parent, String name, T file) throws IOException { |
| this.parent = parent; |
| this.name = name; |
| - file = new RandomAccessFile(new File(parent.directory, name), "rw"); |
| + this.file = file; |
| isOpen = true; |
| } |
| - |
| - /** output methods: */ |
| - @Override |
| - public void flushBuffer(byte[] b, int offset, int size) throws IOException { |
| - assert isOpen; |
| - file.write(b, offset, size); |
| - } |
| |
| @Override |
| public void close() throws IOException { |
| @@ -497,8 +491,30 @@
|
| } |
| } |
| } |
| + |
| + protected void ensureOpen() { |
| + assert isOpen; |
| + } |
| + } |
| + |
| + protected static class RandomAccessFileFSIndexOutput extends FSIndexOutput<RandomAccessFile> { |
| |
| + public RandomAccessFileFSIndexOutput(FSDirectory parent, String name) throws IOException { |
| + super(parent, name, new RandomAccessFile(new File(parent.directory, name), "rw")); |
| + } |
| + |
| + public RandomAccessFileFSIndexOutput(FSDirectory parent, String name, RandomAccessFile file) throws IOException { |
| + super(parent, name, file); |
| + } |
| + |
| @Override |
| + protected void flushBuffer(byte[] b, int offset, int len) |
| + throws IOException { |
| + ensureOpen(); |
| + file.write(b, offset, len); |
| + } |
| + |
| + @Override |
| public long length() throws IOException { |
| return file.length(); |
| } |
| @@ -508,7 +524,40 @@
|
| file.setLength(length); |
| } |
| } |
| + |
| + protected static class FileChannelFSIndexOuptut extends FSIndexOutput<FileChannel> { |
| |
| + public FileChannelFSIndexOuptut(FSDirectory parent, String name) throws IOException { |
| + super(parent, name, FileChannel.open( |
| + new File(parent.directory, name).toPath(), |
| + StandardOpenOption.READ, StandardOpenOption.WRITE)); |
| + } |
| + |
| + public FileChannelFSIndexOuptut(FSDirectory parent, String name, FileChannel file) throws IOException { |
| + super(parent, name, file); |
| + } |
| + |
| + @Override |
| + protected void flushBuffer(byte[] b, int offset, int len) |
| + throws IOException { |
| + ensureOpen(); |
| + ByteBuffer bb = ByteBuffer.wrap(b, offset, len); |
| + while (bb.remaining() > 0) { |
| + file.write(bb); |
| + } |
| + } |
| + |
| + @Override |
| + public long length() throws IOException { |
| + return file.size(); |
| + } |
| + |
| + @Override |
| + public void setLength(long length) throws IOException { |
| + file.truncate(length); |
| + } |
| + } |
| + |
| protected void fsync(String name) throws IOException { |
| File fullFile = new File(directory, name); |
| boolean success = false; |
| Index: lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (revision 1458766)
|
| +++ lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java (working copy)
|
| @@ -19,11 +19,11 @@
|
| |
| import java.io.IOException; |
| import java.io.File; |
| -import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedChannelException; // javadoc @link |
| import java.nio.channels.FileChannel; |
| import java.nio.channels.FileChannel.MapMode; |
| +import java.nio.file.StandardOpenOption; |
| |
| import java.security.AccessController; |
| import java.security.PrivilegedExceptionAction; |
| @@ -133,7 +133,7 @@
|
| this.chunkSizePower = 31 - Integer.numberOfLeadingZeros(maxChunkSize); |
| assert this.chunkSizePower >= 0 && this.chunkSizePower <= 30; |
| } |
| - |
| + |
| /** |
| * <code>true</code>, if this platform supports unmapping mmapped files. |
| */ |
| @@ -189,12 +189,9 @@
|
| @Override |
| public IndexInput openInput(String name, IOContext context) throws IOException { |
| ensureOpen(); |
| - File f = new File(getDirectory(), name); |
| - RandomAccessFile raf = new RandomAccessFile(f, "r"); |
| - try { |
| - return new MMapIndexInput("MMapIndexInput(path=\"" + f + "\")", raf); |
| - } finally { |
| - raf.close(); |
| + File file = new File(getDirectory(), name); |
| + try (FileChannel c = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { |
| + return new MMapIndexInput("MMapIndexInput(path=\"" + file.toString() + "\")", c); |
| } |
| } |
| |
| @@ -218,8 +215,8 @@
|
| private final class MMapIndexInput extends ByteBufferIndexInput { |
| private final boolean useUnmapHack; |
| |
| - MMapIndexInput(String resourceDescription, RandomAccessFile raf) throws IOException { |
| - super(resourceDescription, map(raf, 0, raf.length()), raf.length(), chunkSizePower, getUseUnmap()); |
| + MMapIndexInput(String resourceDescription, FileChannel fc) throws IOException { |
| + super(resourceDescription, map(fc, 0, fc.size()), fc.size(), chunkSizePower, getUseUnmap()); |
| this.useUnmapHack = getUseUnmap(); |
| } |
| |
| @@ -256,9 +253,9 @@
|
| } |
| |
| /** Maps a file into a set of buffers */ |
| - ByteBuffer[] map(RandomAccessFile raf, long offset, long length) throws IOException { |
| + ByteBuffer[] map(FileChannel fc, long offset, long length) throws IOException { |
| if ((length >>> chunkSizePower) >= Integer.MAX_VALUE) |
| - throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + raf.toString()); |
| + throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + fc.toString()); |
| |
| final long chunkSize = 1L << chunkSizePower; |
| |
| @@ -268,13 +265,12 @@
|
| ByteBuffer buffers[] = new ByteBuffer[nrBuffers]; |
| |
| long bufferStart = 0L; |
| - FileChannel rafc = raf.getChannel(); |
| for (int bufNr = 0; bufNr < nrBuffers; bufNr++) { |
| int bufSize = (int) ( (length > (bufferStart + chunkSize)) |
| ? chunkSize |
| : (length - bufferStart) |
| ); |
| - buffers[bufNr] = rafc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); |
| + buffers[bufNr] = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); |
| bufferStart += bufSize; |
| } |
| |
| Index: lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (revision 1458766)
|
| +++ lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java (working copy)
|
| @@ -20,10 +20,10 @@
|
| import java.io.File; |
| import java.io.EOFException; |
| import java.io.IOException; |
| -import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedChannelException; // javadoc @link |
| import java.nio.channels.FileChannel; |
| +import java.nio.file.StandardOpenOption; |
| import java.util.concurrent.Future; // javadoc |
| |
| /** |
| @@ -77,7 +77,9 @@
|
| @Override |
| public IndexInput openInput(String name, IOContext context) throws IOException { |
| ensureOpen(); |
| - return new NIOFSIndexInput(new File(getDirectory(), name), context, getReadChunkSize()); |
| + File path = new File(getDirectory(), name); |
| + FileChannel fc = FileChannel.open(path.toPath(), StandardOpenOption.READ); |
| + return new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context, getReadChunkSize()); |
| } |
| |
| @Override |
| @@ -85,7 +87,7 @@
|
| final IOContext context) throws IOException { |
| ensureOpen(); |
| final File path = new File(getDirectory(), name); |
| - final RandomAccessFile descriptor = new RandomAccessFile(path, "r"); |
| + final FileChannel descriptor = FileChannel.open(path.toPath(), StandardOpenOption.READ); |
| return new Directory.IndexInputSlicer() { |
| |
| @Override |
| @@ -95,7 +97,7 @@
|
| |
| @Override |
| public IndexInput openSlice(String sliceDescription, long offset, long length) { |
| - return new NIOFSIndexInput(sliceDescription, path, descriptor, descriptor.getChannel(), offset, |
| + return new NIOFSIndexInput("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + offset + ":" + (offset+length) + ")", descriptor, offset, |
| length, BufferedIndexInput.bufferSize(context), getReadChunkSize()); |
| } |
| }; |
| @@ -104,20 +106,16 @@
|
| /** |
| * Reads bytes with {@link FileChannel#read(ByteBuffer, long)} |
| */ |
| - protected static class NIOFSIndexInput extends FSIndexInput { |
| + protected static class NIOFSIndexInput extends FSIndexInput<FileChannel> { |
| |
| private ByteBuffer byteBuf; // wraps the buffer for NIO |
| |
| - final FileChannel channel; |
| - |
| - public NIOFSIndexInput(File path, IOContext context, int chunkSize) throws IOException { |
| - super("NIOFSIndexInput(path=\"" + path + "\")", path, context, chunkSize); |
| - channel = file.getChannel(); |
| + public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context, int chunkSize) throws IOException { |
| + super(resourceDesc, fc, fc.size(), context, chunkSize); |
| } |
| |
| - public NIOFSIndexInput(String sliceDescription, File path, RandomAccessFile file, FileChannel fc, long off, long length, int bufferSize, int chunkSize) { |
| - super("NIOFSIndexInput(" + sliceDescription + " in path=\"" + path + "\" slice=" + off + ":" + (off+length) + ")", file, off, length, bufferSize, chunkSize); |
| - channel = fc; |
| + public NIOFSIndexInput(String resourceDesc, FileChannel fc, long off, long length, int bufferSize, int chunkSize) { |
| + super(resourceDesc, fc, off, length, bufferSize, chunkSize); |
| isClone = true; |
| } |
| |
| @@ -164,7 +162,7 @@
|
| limit = readOffset + readLength; |
| } |
| bb.limit(limit); |
| - int i = channel.read(bb, pos); |
| + int i = file.read(bb, pos); |
| pos += i; |
| readOffset += i; |
| readLength -= i; |
| @@ -186,5 +184,4 @@
|
| @Override |
| protected void seekInternal(long pos) throws IOException {} |
| } |
| - |
| } |
| Index: lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java
|
| ===================================================================
|
| --- lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (revision 1458766)
|
| +++ lucene/core/src/java/org/apache/lucene/store/SimpleFSDirectory.java (working copy)
|
| @@ -55,7 +55,8 @@
|
| public IndexInput openInput(String name, IOContext context) throws IOException { |
| ensureOpen(); |
| final File path = new File(directory, name); |
| - return new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + path.getPath() + "\")", path, context, getReadChunkSize()); |
| + RandomAccessFile raf = new RandomAccessFile(path, "r"); |
| + return new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + path.getPath() + "\")", raf, context, getReadChunkSize()); |
| } |
| |
| @Override |
| @@ -83,10 +84,10 @@
|
| * Reads bytes with {@link RandomAccessFile#seek(long)} followed by |
| * {@link RandomAccessFile#read(byte[], int, int)}. |
| */ |
| - protected static class SimpleFSIndexInput extends FSIndexInput { |
| + protected static class SimpleFSIndexInput extends FSIndexInput<RandomAccessFile> { |
| |
| - public SimpleFSIndexInput(String resourceDesc, File path, IOContext context, int chunkSize) throws IOException { |
| - super(resourceDesc, path, context, chunkSize); |
| + public SimpleFSIndexInput(String resourceDesc, RandomAccessFile file, IOContext context, int chunkSize) throws IOException { |
| + super(resourceDesc, file, file.length(), context, chunkSize); |
| } |
| |
| public SimpleFSIndexInput(String resourceDesc, RandomAccessFile file, long off, long length, int bufferSize, int chunkSize) { |
| @@ -136,5 +137,12 @@
|
| @Override |
| protected void seekInternal(long position) { |
| } |
| + |
| + /** Method used for testing. Returns true if the underlying |
| + * file descriptor is valid. |
| + */ |
| + boolean isFDValid() throws IOException { |
| + return file.getFD().valid(); |
| + } |
| } |
| } |
| Index: lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java
|
| ===================================================================
|
| --- lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java (revision 1458766)
|
| +++ lucene/core/src/test/org/apache/lucene/store/TestBufferedIndexInput.java (working copy)
|
| @@ -21,6 +21,9 @@
|
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| +import java.io.RandomAccessFile; |
| +import java.nio.channels.FileChannel; |
| +import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| @@ -93,12 +96,13 @@
|
| writeBytes(tmpInputFile, TEST_FILE_LENGTH); |
| |
| // run test with chunk size of 10 bytes |
| - runReadBytesAndClose(new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + tmpInputFile + "\")", tmpInputFile, |
| - newIOContext(random()), 10), inputBufferSize, random()); |
| + runReadBytesAndClose(new SimpleFSIndexInput("SimpleFSIndexInput(path=\"" + tmpInputFile + "\")", |
| + new RandomAccessFile(tmpInputFile, "r"), newIOContext(random()), 10), inputBufferSize, random()); |
| |
| // run test with chunk size of 10 bytes |
| - runReadBytesAndClose(new NIOFSIndexInput(tmpInputFile, |
| - newIOContext(random()), 10), inputBufferSize, random()); |
| + runReadBytesAndClose(new NIOFSIndexInput("NIOFSIndexInput(path=\"" + tmpInputFile + "\")", |
| + FileChannel.open(tmpInputFile.toPath(), StandardOpenOption.READ), newIOContext(random()), 10), |
| + inputBufferSize, random()); |
| } |
| |
| private void runReadBytesAndClose(IndexInput input, int bufferSize, Random r) |
| Index: lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java
|
| ===================================================================
|
| --- lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (revision 1458766)
|
| +++ lucene/test-framework/src/java/org/apache/lucene/util/QuickPatchThreadsFilter.java (working copy)
|
| @@ -31,12 +31,38 @@
|
| |
| @Override |
| public boolean reject(Thread t) { |
| + // Try to get the stack lazily and reuse it if it's really necessary. |
| + StackTraceElement [] stack = null; |
| + |
| if (isJ9) { |
| - StackTraceElement [] stack = t.getStackTrace(); |
| + stack = t.getStackTrace(); |
| if (stack.length > 0 && stack[stack.length - 1].getClassName().equals("java.util.Timer$TimerImpl")) { |
| return true; // LUCENE-4736 |
| } |
| } |
| + |
| + // LUCENE-4848; ignore threads inside async IO. There is no easy way to detect whether |
| + // they're part of the system group but we know all these are daemons. |
| + if (t.isDaemon()) { |
| + if (stack == null) stack = t.getStackTrace(); |
| + if (stack.length - 2 >= 0) { |
| + // Some of the Iocp's stuff is called directly, bypassing the threadpool. |
| + if (stack[stack.length - 1].getClassName().equals("java.lang.Thread") && |
| + stack[stack.length - 2].getClassName().startsWith("sun.nio.")) { |
| + return true; |
| + } |
| + |
| + // The default threadpool's threads will end up somewhere in EventHandlerTask's |
| + // run method. |
| + for (int i = 0; i < stack.length - 1; i++) { |
| + if (stack[i].getClassName().equals("sun.nio.ch.Iocp$EventHandlerTask") && |
| + stack[i].getMethodName().equals("run")) { |
| + return true; |
| + } |
| + } |
| + } |
| + } |
| + |
| return false; |
| } |
| } |