blob: 5e60fb79e3f98254f76e588d144964a60f2e55fc [file] [log] [blame]
using System;
using System.Diagnostics;
using System.Threading;
namespace Lucene.Net.Index
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Lucene.Net.Support;
/// <summary>
/// <seealso cref="DocumentsWriterPerThreadPool"/> controls <seealso cref="ThreadState"/> instances
/// and their thread assignments during indexing. Each <seealso cref="ThreadState"/> holds
/// a reference to a <seealso cref="DocumentsWriterPerThread"/> that is once a
/// <seealso cref="ThreadState"/> is obtained from the pool exclusively used for indexing a
/// single document by the obtaining thread. Each indexing thread must obtain
/// such a <seealso cref="ThreadState"/> to make progress. Depending on the
/// <seealso cref="DocumentsWriterPerThreadPool"/> implementation <seealso cref="ThreadState"/>
/// assignments might differ from document to document.
/// <p>
/// Once a <seealso cref="DocumentsWriterPerThread"/> is selected for flush the thread pool
/// is reusing the flushing <seealso cref="DocumentsWriterPerThread"/>s ThreadState with a
/// new <seealso cref="DocumentsWriterPerThread"/> instance.
/// </p>
/// </summary>
public abstract class DocumentsWriterPerThreadPool : ICloneable
{
/// <summary>
/// <seealso cref="ThreadState"/> references and guards a
/// <seealso cref="DocumentsWriterPerThread"/> instance that is used during indexing to
/// build a in-memory index segment. <seealso cref="ThreadState"/> also holds all flush
/// related per-thread data controlled by <seealso cref="DocumentsWriterFlushControl"/>.
/// <p>
/// A <seealso cref="ThreadState"/>, its methods and members should only accessed by one
/// thread a time. Users must acquire the lock via <seealso cref="ThreadState#lock()"/>
/// and release the lock in a finally block via <seealso cref="ThreadState#unlock()"/>
/// before accessing the state.
/// </summary>
public sealed class ThreadState : ReentrantLock
{
internal DocumentsWriterPerThread Dwpt;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
internal volatile bool FlushPending_Renamed = false;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
internal long BytesUsed = 0;
// guarded by Reentrant lock
internal bool IsActive = true;
public ThreadState(DocumentsWriterPerThread dpwt)
{
this.Dwpt = dpwt;
}
/// <summary>
/// Resets the internal <seealso cref="DocumentsWriterPerThread"/> with the given one.
/// if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
/// for indexing anymore. </summary>
/// <seealso cref= #isActive() </seealso>
internal void Deactivate()
{
//Debug.Assert(this.HeldByCurrentThread);
IsActive = false;
Reset();
}
internal void Reset()
{
//Debug.Assert(this.HeldByCurrentThread);
this.Dwpt = null;
this.BytesUsed = 0;
this.FlushPending_Renamed = false;
}
/// <summary>
/// Returns <code>true</code> if this ThreadState is still open. this will
/// only return <code>false</code> iff the DW has been closed and this
/// ThreadState is already checked out for flush.
/// </summary>
internal bool Active
{
get
{
//Debug.Assert(this.HeldByCurrentThread);
return IsActive;
}
}
internal bool Initialized
{
get
{
//Debug.Assert(this.HeldByCurrentThread);
return Active && Dwpt != null;
}
}
/// <summary>
/// Returns the number of currently active bytes in this ThreadState's
/// <seealso cref="DocumentsWriterPerThread"/>
/// </summary>
public long BytesUsedPerThread
{
get
{
//Debug.Assert(this.HeldByCurrentThread);
// public for FlushPolicy
return BytesUsed;
}
}
/// <summary>
/// Returns this <seealso cref="ThreadState"/>s <seealso cref="DocumentsWriterPerThread"/>
/// </summary>
public DocumentsWriterPerThread DocumentsWriterPerThread
{
get
{
//Debug.Assert(this.HeldByCurrentThread);
// public for FlushPolicy
return Dwpt;
}
}
/// <summary>
/// Returns <code>true</code> iff this <seealso cref="ThreadState"/> is marked as flush
/// pending otherwise <code>false</code>
/// </summary>
public bool FlushPending
{
get
{
return FlushPending_Renamed;
}
}
}
private ThreadState[] ThreadStates;
private volatile int NumThreadStatesActive;
/// <summary>
/// Creates a new <seealso cref="DocumentsWriterPerThreadPool"/> with a given maximum of <seealso cref="ThreadState"/>s.
/// </summary>
public DocumentsWriterPerThreadPool(int maxNumThreadStates)
{
if (maxNumThreadStates < 1)
{
throw new System.ArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
}
ThreadStates = new ThreadState[maxNumThreadStates];
NumThreadStatesActive = 0;
for (int i = 0; i < ThreadStates.Length; i++)
{
ThreadStates[i] = new ThreadState(null);
}
}
public virtual object Clone()
{
// We should only be cloned before being used:
if (NumThreadStatesActive != 0)
{
throw new InvalidOperationException("clone this object before it is used!");
}
DocumentsWriterPerThreadPool clone;
clone = (DocumentsWriterPerThreadPool)base.MemberwiseClone();
clone.ThreadStates = new ThreadState[ThreadStates.Length];
for (int i = 0; i < ThreadStates.Length; i++)
{
clone.ThreadStates[i] = new ThreadState(null);
}
return clone;
}
/// <summary>
/// Returns the max number of <seealso cref="ThreadState"/> instances available in this
/// <seealso cref="DocumentsWriterPerThreadPool"/>
/// </summary>
public virtual int MaxThreadStates
{
get
{
return ThreadStates.Length;
}
}
/// <summary>
/// Returns the active number of <seealso cref="ThreadState"/> instances.
/// </summary>
public virtual int ActiveThreadState
{
get
{
return NumThreadStatesActive;
}
}
/// <summary>
/// Returns a new <seealso cref="ThreadState"/> iff any new state is available otherwise
/// <code>null</code>.
/// <p>
/// NOTE: the returned <seealso cref="ThreadState"/> is already locked iff non-
/// <code>null</code>.
/// </summary>
/// <returns> a new <seealso cref="ThreadState"/> iff any new state is available otherwise
/// <code>null</code> </returns>
public virtual ThreadState NewThreadState()
{
lock (this)
{
if (NumThreadStatesActive < ThreadStates.Length)
{
ThreadState threadState = ThreadStates[NumThreadStatesActive];
threadState.@Lock(); // lock so nobody else will get this ThreadState
bool unlock = true;
try
{
if (threadState.Active)
{
// unreleased thread states are deactivated during DW#close()
NumThreadStatesActive++; // increment will publish the ThreadState
Debug.Assert(threadState.Dwpt == null);
unlock = false;
return threadState;
}
// unlock since the threadstate is not active anymore - we are closed!
Debug.Assert(AssertUnreleasedThreadStatesInactive());
return null;
}
finally
{
if (unlock)
{
// in any case make sure we unlock if we fail
threadState.Unlock();
}
}
}
return null;
}
}
private bool AssertUnreleasedThreadStatesInactive()
{
lock (this)
{
for (int i = NumThreadStatesActive; i < ThreadStates.Length; i++)
{
Debug.Assert(ThreadStates[i].TryLock(), "unreleased threadstate should not be locked");
try
{
Debug.Assert(!ThreadStates[i].Initialized, "expected unreleased thread state to be inactive");
}
finally
{
ThreadStates[i].Unlock();
}
}
return true;
}
}
/// <summary>
/// Deactivate all unreleased threadstates
/// </summary>
internal virtual void DeactivateUnreleasedStates()
{
lock (this)
{
for (int i = NumThreadStatesActive; i < ThreadStates.Length; i++)
{
ThreadState threadState = ThreadStates[i];
threadState.@Lock();
try
{
threadState.Deactivate();
}
finally
{
threadState.Unlock();
}
}
}
}
internal virtual DocumentsWriterPerThread Reset(ThreadState threadState, bool closed)
{
//Debug.Assert(threadState.HeldByCurrentThread);
DocumentsWriterPerThread dwpt = threadState.Dwpt;
if (!closed)
{
threadState.Reset();
}
else
{
threadState.Deactivate();
}
return dwpt;
}
internal virtual void Recycle(DocumentsWriterPerThread dwpt)
{
// don't recycle DWPT by default
}
// you cannot subclass this without being in o.a.l.index package anyway, so
// the class is already pkg-private... fix me: see LUCENE-4013
public abstract ThreadState GetAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/// <summary>
/// Returns the <i>i</i>th active <seealso cref="ThreadState"/> where <i>i</i> is the
/// given ord.
/// </summary>
/// <param name="ord">
/// the ordinal of the <seealso cref="ThreadState"/> </param>
/// <returns> the <i>i</i>th active <seealso cref="ThreadState"/> where <i>i</i> is the
/// given ord. </returns>
internal virtual ThreadState GetThreadState(int ord)
{
return ThreadStates[ord];
}
/// <summary>
/// Returns the ThreadState with the minimum estimated number of threads
/// waiting to acquire its lock or <code>null</code> if no <seealso cref="ThreadState"/>
/// is yet visible to the calling thread.
/// </summary>
internal virtual ThreadState MinContendedThreadState()
{
ThreadState minThreadState = null;
int limit = NumThreadStatesActive;
for (int i = 0; i < limit; i++)
{
ThreadState state = ThreadStates[i];
if (minThreadState == null || state.QueueLength < minThreadState.QueueLength)
{
minThreadState = state;
}
}
return minThreadState;
}
/// <summary>
/// Returns the number of currently deactivated <seealso cref="ThreadState"/> instances.
/// A deactivated <seealso cref="ThreadState"/> should not be used for indexing anymore.
/// </summary>
/// <returns> the number of currently deactivated <seealso cref="ThreadState"/> instances. </returns>
internal virtual int NumDeactivatedThreadStates()
{
int count = 0;
for (int i = 0; i < ThreadStates.Length; i++)
{
ThreadState threadState = ThreadStates[i];
threadState.@Lock();
try
{
if (!threadState.IsActive)
{
count++;
}
}
finally
{
threadState.Unlock();
}
}
return count;
}
/// <summary>
/// Deactivates an active <seealso cref="ThreadState"/>. Inactive <seealso cref="ThreadState"/> can
/// not be used for indexing anymore once they are deactivated. this method should only be used
/// if the parent <seealso cref="DocumentsWriter"/> is closed or aborted.
/// </summary>
/// <param name="threadState"> the state to deactivate </param>
internal virtual void DeactivateThreadState(ThreadState threadState)
{
Debug.Assert(threadState.Active);
threadState.Deactivate();
}
}
}