blob: cf2a1653d088f14dddfcbaec76fa33ccbc5530ce [file] [log] [blame]
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.util.SetOnce;
/**
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
* and their thread assignments during indexing. Each {@link ThreadState} holds
* a reference to a {@link DocumentsWriterPerThread} that is once a
* {@link ThreadState} is obtained from the pool exclusively used for indexing a
* single document by the obtaining thread. Each indexing thread must obtain
* such a {@link ThreadState} to make progress. Depending on the
* {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
* assignments might differ from document to document.
* <p>
* Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool
* is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
* new {@link DocumentsWriterPerThread} instance.
* </p>
*/
public abstract class DocumentsWriterPerThreadPool {
/** The maximum number of simultaneous threads that may be
* indexing documents at once in IndexWriter; if more
* than this many threads arrive they will wait for
* others to finish. */
public final static int DEFAULT_MAX_THREAD_STATES = 8;
/**
* {@link ThreadState} references and guards a
* {@link DocumentsWriterPerThread} instance that is used during indexing to
* build a in-memory index segment. {@link ThreadState} also holds all flush
* related per-thread data controlled by {@link DocumentsWriterFlushControl}.
* <p>
* A {@link ThreadState}, its methods and members should only accessed by one
* thread a time. Users must acquire the lock via {@link ThreadState#lock()}
* and release the lock in a finally block via {@link ThreadState#unlock()}
* before accessing the state.
*/
@SuppressWarnings("serial")
public final static class ThreadState extends ReentrantLock {
// package private for FlushPolicy
DocumentsWriterPerThread perThread;
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
// write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
ThreadState(DocumentsWriterPerThread perThread) {
this.perThread = perThread;
}
/**
* Resets the internal {@link DocumentsWriterPerThread} with the given one.
* if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
* for indexing anymore.
* @see #isActive()
*/
void resetWriter(DocumentsWriterPerThread perThread) {
assert this.isHeldByCurrentThread();
if (perThread == null) {
isActive = false;
}
this.perThread = perThread;
this.bytesUsed = 0;
this.flushPending = false;
}
/**
* Returns <code>true</code> if this ThreadState is still open. This will
* only return <code>false</code> iff the DW has been closed and this
* ThreadState is already checked out for flush.
*/
boolean isActive() {
assert this.isHeldByCurrentThread();
return isActive;
}
/**
* Returns the number of currently active bytes in this ThreadState's
* {@link DocumentsWriterPerThread}
*/
public long getBytesUsedPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return bytesUsed;
}
/**
* Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
*/
public DocumentsWriterPerThread getDocumentsWriterPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return perThread;
}
/**
* Returns <code>true</code> iff this {@link ThreadState} is marked as flush
* pending otherwise <code>false</code>
*/
public boolean isFlushPending() {
return flushPending;
}
}
private final ThreadState[] perThreads;
private volatile int numThreadStatesActive;
private CodecProvider codecProvider;
private FieldNumberBiMap globalFieldMap;
private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with max.
* {@link #DEFAULT_MAX_THREAD_STATES} thread states.
*/
public DocumentsWriterPerThreadPool() {
this(DEFAULT_MAX_THREAD_STATES);
}
public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
perThreads = new ThreadState[maxNumPerThreads];
numThreadStatesActive = 0;
}
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
final CodecProvider codecs = config.getCodecProvider();
this.codecProvider = codecs;
this.globalFieldMap = globalFieldMap;
for (int i = 0; i < perThreads.length; i++) {
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecs));
perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
}
}
/**
* Returns the max number of {@link ThreadState} instances available in this
* {@link DocumentsWriterPerThreadPool}
*/
public int getMaxThreadStates() {
return perThreads.length;
}
/**
* Returns the active number of {@link ThreadState} instances.
*/
public int getActiveThreadState() {
return numThreadStatesActive;
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>.
* <p>
* NOTE: the returned {@link ThreadState} is already locked iff non-
* <code>null</code>.
*
* @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>
*/
public synchronized ThreadState newThreadState() {
if (numThreadStatesActive < perThreads.length) {
final ThreadState threadState = perThreads[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState
numThreadStatesActive++; // increment will publish the ThreadState
threadState.perThread.initialize();
return threadState;
}
return null;
}
/**
* Deactivate all unreleased threadstates
*/
protected synchronized void deactivateUnreleasedStates() {
for (int i = numThreadStatesActive; i < perThreads.length; i++) {
final ThreadState threadState = perThreads[i];
threadState.lock();
try {
threadState.resetWriter(null);
} finally {
threadState.unlock();
}
}
}
protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt = threadState.perThread;
if (!closed) {
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
} else {
threadState.resetWriter(null);
}
return dwpt;
}
public void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default
}
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/**
* Returns an iterator providing access to all {@link ThreadState}
* instances.
*/
// TODO: new Iterator per indexed doc is overkill...?
public Iterator<ThreadState> getAllPerThreadsIterator() {
return getPerThreadsIterator(this.perThreads.length);
}
/**
* Returns an iterator providing access to all active {@link ThreadState}
* instances.
* <p>
* Note: The returned iterator will only iterator
* {@link ThreadState}s that are active at the point in time when this method
* has been called.
*
*/
// TODO: new Iterator per indexed doc is overkill...?
public Iterator<ThreadState> getActivePerThreadsIterator() {
return getPerThreadsIterator(numThreadStatesActive);
}
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
return new Iterator<ThreadState>() {
int i = 0;
public boolean hasNext() {
return i < upto;
}
public ThreadState next() {
return perThreads[i++];
}
public void remove() {
throw new UnsupportedOperationException("remove() not supported.");
}
};
}
/**
* Returns the ThreadState with the minimum estimated number of threads
* waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
* is yet visible to the calling thread.
*/
protected ThreadState minContendedThreadState() {
ThreadState minThreadState = null;
// TODO: new Iterator per indexed doc is overkill...?
final Iterator<ThreadState> it = getActivePerThreadsIterator();
while (it.hasNext()) {
final ThreadState state = it.next();
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
}