blob: c2b9123572c95563a7e0f71e0d37abdba16ec654 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.util.SetOnce;
/**
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
* and their thread assignments during indexing. Each {@link ThreadState} holds
* a reference to a {@link DocumentsWriterPerThread} that is once a
* {@link ThreadState} is obtained from the pool exclusively used for indexing a
* single document by the obtaining thread. Each indexing thread must obtain
* such a {@link ThreadState} to make progress. Depending on the
* {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
* assignments might differ from document to document.
* <p>
* Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool
* is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
* new {@link DocumentsWriterPerThread} instance.
* </p>
*/
abstract class DocumentsWriterPerThreadPool implements Cloneable {
/**
* {@link ThreadState} references and guards a
* {@link DocumentsWriterPerThread} instance that is used during indexing to
* build a in-memory index segment. {@link ThreadState} also holds all flush
* related per-thread data controlled by {@link DocumentsWriterFlushControl}.
* <p>
* A {@link ThreadState}, its methods and members should only accessed by one
* thread a time. Users must acquire the lock via {@link ThreadState#lock()}
* and release the lock in a finally block via {@link ThreadState#unlock()}
* before accessing the state.
*/
@SuppressWarnings("serial")
final static class ThreadState extends ReentrantLock {
DocumentsWriterPerThread dwpt;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
ThreadState(DocumentsWriterPerThread dpwt) {
this.dwpt = dpwt;
}
/**
* Resets the internal {@link DocumentsWriterPerThread} with the given one.
* if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
* for indexing anymore.
* @see #isActive()
*/
private void resetWriter(DocumentsWriterPerThread dwpt) {
assert this.isHeldByCurrentThread();
if (dwpt == null) {
isActive = false;
}
this.dwpt = dwpt;
this.bytesUsed = 0;
this.flushPending = false;
}
/**
* Returns <code>true</code> if this ThreadState is still open. This will
* only return <code>false</code> iff the DW has been closed and this
* ThreadState is already checked out for flush.
*/
boolean isActive() {
assert this.isHeldByCurrentThread();
return isActive;
}
/**
* Returns the number of currently active bytes in this ThreadState's
* {@link DocumentsWriterPerThread}
*/
public long getBytesUsedPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return bytesUsed;
}
/**
* Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
*/
public DocumentsWriterPerThread getDocumentsWriterPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return dwpt;
}
/**
* Returns <code>true</code> iff this {@link ThreadState} is marked as flush
* pending otherwise <code>false</code>
*/
public boolean isFlushPending() {
return flushPending;
}
}
private ThreadState[] threadStates;
private volatile int numThreadStatesActive;
private SetOnce<FieldNumbers> globalFieldMap = new SetOnce<FieldNumbers>();
private SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
DocumentsWriterPerThreadPool(int maxNumThreadStates) {
if (maxNumThreadStates < 1) {
throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
}
threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
}
void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
this.globalFieldMap.set(globalFieldMap);
for (int i = 0; i < threadStates.length; i++) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap);
threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
}
}
@Override
public DocumentsWriterPerThreadPool clone() {
// We should only be cloned before being used:
assert numThreadStatesActive == 0;
DocumentsWriterPerThreadPool clone;
try {
clone = (DocumentsWriterPerThreadPool) super.clone();
} catch (CloneNotSupportedException e) {
// should not happen
throw new RuntimeException(e);
}
clone.documentsWriter = new SetOnce<DocumentsWriter>();
clone.globalFieldMap = new SetOnce<FieldNumbers>();
clone.threadStates = new ThreadState[threadStates.length];
return clone;
}
/**
* Returns the max number of {@link ThreadState} instances available in this
* {@link DocumentsWriterPerThreadPool}
*/
int getMaxThreadStates() {
return threadStates.length;
}
/**
* Returns the active number of {@link ThreadState} instances.
*/
int getActiveThreadState() {
return numThreadStatesActive;
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>.
* <p>
* NOTE: the returned {@link ThreadState} is already locked iff non-
* <code>null</code>.
*
* @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>
*/
synchronized ThreadState newThreadState() {
if (numThreadStatesActive < threadStates.length) {
final ThreadState threadState = threadStates[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState
boolean unlock = true;
try {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
assert threadState.dwpt != null;
threadState.dwpt.initialize();
unlock = false;
return threadState;
}
// unlock since the threadstate is not active anymore - we are closed!
assert assertUnreleasedThreadStatesInactive();
return null;
} finally {
if (unlock) {
// in any case make sure we unlock if we fail
threadState.unlock();
}
}
}
return null;
}
private synchronized boolean assertUnreleasedThreadStatesInactive() {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
try {
assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
} finally {
threadStates[i].unlock();
}
}
return true;
}
/**
* Deactivate all unreleased threadstates
*/
synchronized void deactivateUnreleasedStates() {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
threadState.resetWriter(null);
} finally {
threadState.unlock();
}
}
}
DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
assert globalFieldMap.get() != null;
final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get());
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
} else {
threadState.resetWriter(null);
}
return dwpt;
}
void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default
}
// you cannot subclass this without being in o.a.l.index package anyway, so
// the class is already pkg-private... fix me: see LUCENE-4013
abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/**
* Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
* given ord.
*
* @param ord
* the ordinal of the {@link ThreadState}
* @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
* given ord.
*/
ThreadState getThreadState(int ord) {
//assert ord < numThreadStatesActive;
return threadStates[ord];
}
/**
* Returns the ThreadState with the minimum estimated number of threads
* waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
* is yet visible to the calling thread.
*/
ThreadState minContendedThreadState() {
ThreadState minThreadState = null;
final int limit = numThreadStatesActive;
for (int i = 0; i < limit; i++) {
final ThreadState state = threadStates[i];
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
/**
* Returns the number of currently deactivated {@link ThreadState} instances.
* A deactivated {@link ThreadState} should not be used for indexing anymore.
*
* @return the number of currently deactivated {@link ThreadState} instances.
*/
int numDeactivatedThreadStates() {
int count = 0;
for (int i = 0; i < threadStates.length; i++) {
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
if (!threadState.isActive) {
count++;
}
} finally {
threadState.unlock();
}
}
return count;
}
/**
* Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can
* not be used for indexing anymore once they are deactivated. This method should only be used
* if the parent {@link DocumentsWriter} is closed or aborted.
*
* @param threadState the state to deactivate
*/
void deactivateThreadState(ThreadState threadState) {
assert threadState.isActive();
threadState.resetWriter(null);
}
/**
* Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
* only be reinitialized if it is active without any pending documents.
*
* @param threadState the state to reinitialize
*/
void reinitThreadState(ThreadState threadState) {
assert threadState.isActive;
assert threadState.dwpt.getNumDocsInRAM() == 0;
threadState.dwpt.initialize();
}
}