blob: 82a0477a6c2629cf621c05848a8bd5253c6cb218 [file] [log] [blame]
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
/**
* A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
* indexing thread to the same {@link ThreadState} each time the thread tries to
* obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
* associated with the creating thread. Subsequently, if the threads associated
* {@link ThreadState} is not in use it will be associated with the requesting
* thread. Otherwise, if the {@link ThreadState} is used by another thread
* {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
* minimal contended {@link ThreadState}.
*/
class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
/**
* Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
*/
public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
super(maxNumPerThreads);
assert getMaxThreadStates() >= 1;
}
@Override
public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = threadBindings.get(requestingThread);
if (threadState != null && threadState.tryLock()) {
return threadState;
}
ThreadState minThreadState = null;
/* TODO -- another thread could lock the minThreadState we just got while
we should somehow prevent this. */
// Find the state that has minimum number of threads waiting
minThreadState = minContendedThreadState();
if (minThreadState == null || minThreadState.hasQueuedThreads()) {
final ThreadState newState = newThreadState(); // state is already locked if non-null
if (newState != null) {
assert newState.isHeldByCurrentThread();
threadBindings.put(requestingThread, newState);
return newState;
} else if (minThreadState == null) {
/*
* no new threadState available we just take the minContented one
* This must return a valid thread state since we accessed the
* synced context in newThreadState() above.
*/
minThreadState = minContendedThreadState();
}
}
assert minThreadState != null: "ThreadState is null";
minThreadState.lock();
return minThreadState;
}
@Override
public ThreadAffinityDocumentsWriterThreadPool clone() {
ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool) super.clone();
clone.threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
return clone;
}
}