blob: 23b378c340ffd401e91df166a1b1482fd74d2ca2 [file] [log] [blame]
using Lucene.Net.Store;
using Lucene.Net.Support.Threading;
using Lucene.Net.Util;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.IO;
using System.Threading;
using JCG = J2N.Collections.Generic;
using Directory = Lucene.Net.Store.Directory;
using Lucene.Net.Diagnostics;
namespace Lucene.Net.Replicator
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// <summary>
/// A client which monitors and obtains new revisions from a <see cref="IReplicator"/>.
/// It can be used to either periodically check for updates by invoking
/// <see cref="StartUpdateThread"/>, or manually by calling <see cref="UpdateNow()"/>.
/// <para/>
/// Whenever a new revision is available, the <see cref="RequiredFiles"/> are
/// copied to the <see cref="Directory"/> specified by <see cref="PerSessionDirectoryFactory"/> and
/// a handler is notified.
/// </summary>
/// <remarks>
/// @lucene.experimental
/// </remarks>
public class ReplicationClient : IDisposable
{
//Note: LUCENENET specific, .NET does not work with Threads in the same way as Java does, so we mimic the same behavior using the ThreadPool instead.
private class ReplicationThread
{
private readonly Action doUpdate;
private readonly Action<Exception> handleException;
private readonly ReentrantLock @lock;
private readonly object controlLock = new object();
private readonly long interval;
private readonly AutoResetEvent handle = new AutoResetEvent(false);
private AutoResetEvent stopHandle;
/// <summary>
/// Gets or sets the name
/// </summary>
public string Name { get; private set; }
/// <summary>
///
/// </summary>
/// <param name="intervalMillis">The interval in milliseconds.</param>
/// <param name="threadName">The thread name.</param>
/// <param name="doUpdate">A delegate to call to perform the update.</param>
/// <param name="handleException">A delegate to call to handle an exception.</param>
/// <param name="lock"></param>
public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action<Exception> handleException, ReentrantLock @lock)
{
this.doUpdate = doUpdate;
this.handleException = handleException;
this.@lock = @lock;
Name = threadName;
this.interval = intervalMillis;
}
/// <summary>
///
/// </summary>
public bool IsAlive { get; private set; }
/// <summary>
///
/// </summary>
public void Start()
{
lock (controlLock)
{
if (IsAlive)
return;
IsAlive = true;
}
RegisterWait(interval);
}
/// <summary>
///
/// </summary>
public void Stop()
{
lock (controlLock)
{
if (!IsAlive)
return;
IsAlive = false;
}
stopHandle = new AutoResetEvent(false);
//NOTE: Execute any outstanding, this execution will terminate almost instantaniously if it's not already running.
ExecuteImmediately();
stopHandle.WaitOne();
stopHandle = null;
}
/// <summary>
/// Executes the next cycle of work immediately
/// </summary>
public void ExecuteImmediately()
{
handle.Set();
}
private void RegisterWait(long timeout)
{
//NOTE: We don't care about timedout as it can either be because we was requested to run immidiately or stop.
if (IsAlive)
ThreadPool.RegisterWaitForSingleObject(handle, (state, timedout) => Run(), null, timeout, true);
else
SignalStop();
}
private void SignalStop()
{
if (stopHandle != null)
stopHandle.Set();
}
private void Run()
{
if (!IsAlive)
{
SignalStop();
return;
}
Stopwatch timer = Stopwatch.StartNew();
@lock.Lock();
try
{
doUpdate();
}
catch (Exception exception)
{
handleException(exception);
}
finally
{
@lock.Unlock();
timer.Stop();
long driftAdjusted = Math.Max(interval - timer.ElapsedMilliseconds, 0);
if (IsAlive)
RegisterWait(driftAdjusted);
else
SignalStop();
}
}
}
// LUCENENET specific - de-nested the IReplicationHandler and
// ISourceDirectoryFactory interfaces.
/// <summary>
/// The component name to use with <see cref="Util.InfoStream.IsEnabled"/>
/// </summary>
public const string INFO_STREAM_COMPONENT = "ReplicationThread";
private readonly IReplicator replicator;
private readonly IReplicationHandler handler;
private readonly ISourceDirectoryFactory factory;
private readonly byte[] copyBuffer = new byte[16384];
private readonly ReentrantLock updateLock = new ReentrantLock();
private ReplicationThread updateThread;
private bool disposed = false;
private InfoStream infoStream = InfoStream.Default;
/// <summary>
/// Constructor.
/// </summary>
/// <param name="replicator">The <see cref="IReplicator"/> used for checking for updates</param>
/// <param name="handler">The <see cref="IReplicationHandler"/> notified when new revisions are ready</param>
/// <param name="factory">The <see cref="ISourceDirectoryFactory"/> for returning a <see cref="Directory"/> for a given source and session</param>
public ReplicationClient(IReplicator replicator, IReplicationHandler handler, ISourceDirectoryFactory factory)
{
this.replicator = replicator;
this.handler = handler;
this.factory = factory;
}
/// <exception cref="IOException"></exception>
private void CopyBytes(IndexOutput output, Stream input)
{
int numBytes;
while ((numBytes = input.Read(copyBuffer, 0, copyBuffer.Length)) > 0)
{
output.WriteBytes(copyBuffer, 0, numBytes);
}
}
/// <exception cref="IOException"></exception>
private void DoUpdate()
{
SessionToken session = null;
Dictionary<string, Directory> sourceDirectory = new Dictionary<string, Directory>();
Dictionary<string, IList<string>> copiedFiles = new Dictionary<string, IList<string>>();
bool notify = false;
try
{
string version = handler.CurrentVersion;
session = replicator.CheckForUpdate(version);
WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session));
if (session == null)
return;
IDictionary<string, IList<RevisionFile>> requiredFiles = RequiredFiles(session.SourceFiles);
WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session));
foreach (KeyValuePair<string, IList<RevisionFile>> pair in requiredFiles)
{
string source = pair.Key;
Directory directory = factory.GetDirectory(session.Id, source);
sourceDirectory.Add(source, directory);
List<string> cpFiles = new List<string>();
copiedFiles.Add(source, cpFiles);
foreach (RevisionFile file in pair.Value)
{
if (disposed)
{
// if we're closed, abort file copy
WriteToInfoStream("doUpdate(): detected client was closed); abort file copy");
return;
}
Stream input = null;
IndexOutput output = null;
try
{
input = replicator.ObtainFile(session.Id, source, file.FileName);
output = directory.CreateOutput(file.FileName, IOContext.DEFAULT);
CopyBytes(output, input);
cpFiles.Add(file.FileName);
// TODO add some validation, on size / checksum
}
finally
{
IOUtils.Dispose(input, output);
}
}
// only notify if all required files were successfully obtained.
notify = true;
}
}
finally
{
if (session != null)
{
try
{
replicator.Release(session.Id);
}
finally
{
if (!notify)
{
// cleanup after ourselves
IOUtils.Dispose(sourceDirectory.Values);
factory.CleanupSession(session.Id);
}
}
}
}
// notify outside the try-finally above, so the session is released sooner.
// the handler may take time to finish acting on the copied files, but the
// session itself is no longer needed.
try
{
if (notify && !disposed)
{ // no use to notify if we are closed already
handler.RevisionReady(session.Version, session.SourceFiles, new ReadOnlyDictionary<string, IList<string>>(copiedFiles), sourceDirectory);
}
}
finally
{
IOUtils.Dispose(sourceDirectory.Values);
//TODO: Resharper Message, Expression is always true -> Verify and if so then we can remove the null check.
if (session != null)
{
factory.CleanupSession(session.Id);
}
}
}
/// <summary>Throws <see cref="ObjectDisposedException"/> if the client has already been disposed.</summary>
protected void EnsureOpen()
{
if (!disposed)
return;
throw new ObjectDisposedException("this update client has already been closed");
}
// LUCENENET specific Utility Method
private void WriteToInfoStream(string message)
{
if (infoStream.IsEnabled(INFO_STREAM_COMPONENT))
infoStream.Message(INFO_STREAM_COMPONENT, message);
}
/// <summary>
/// Called when an exception is hit by the replication thread. The default
/// implementation prints the full stacktrace to the <see cref="Util.InfoStream"/> set in
/// <see cref="InfoStream"/>, or the <see cref="Util.InfoStream.Default"/>
/// one. You can override to log the exception elsewhere.
/// </summary>
/// <remarks>
/// <b>NOTE:</b> If you override this method to throw the exception further,
/// the replication thread will be terminated. The only way to restart it is to
/// call <see cref="StopUpdateThread"/> followed by
/// <see cref="StartUpdateThread"/>.
/// </remarks>
protected virtual void HandleUpdateException(Exception exception)
{
WriteToInfoStream(string.Format("an error occurred during revision update: {0}", exception));
}
/// <summary>
/// Returns the files required for replication. By default, this method returns
/// all files that exist in the new revision, but not in the handler.
/// </summary>
protected virtual IDictionary<string, IList<RevisionFile>> RequiredFiles(IDictionary<string, IList<RevisionFile>> newRevisionFiles)
{
IDictionary<string, IList<RevisionFile>> handlerRevisionFiles = handler.CurrentRevisionFiles;
if (handlerRevisionFiles == null)
return newRevisionFiles;
Dictionary<string, IList<RevisionFile>> requiredFiles = new Dictionary<string, IList<RevisionFile>>();
foreach (var e in handlerRevisionFiles)
{
// put the handler files in a Set, for faster contains() checks later
ISet<string> handlerFiles = new JCG.HashSet<string>();
foreach (RevisionFile file in e.Value)
{
handlerFiles.Add(file.FileName);
}
// make sure to preserve revisionFiles order
List<RevisionFile> res = new List<RevisionFile>();
string source = e.Key;
if (Debugging.AssertsEnabled) Debugging.Assert(newRevisionFiles.ContainsKey(source), () => string.Format("source not found in newRevisionFiles: {0}", newRevisionFiles));
foreach (RevisionFile file in newRevisionFiles[source])
{
if (!handlerFiles.Contains(file.FileName))
{
res.Add(file);
}
}
requiredFiles[source] = res;
}
return requiredFiles;
}
protected virtual void Dispose(bool disposing)
{
if (disposed || !disposing)
return;
StopUpdateThread();
disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Start the update thread with the specified interval in milliseconds. For
/// debugging purposes, you can optionally set the name to set on
/// <see cref="ReplicationThread.Name"/>. If you pass <c>null</c>, a default name
/// will be set.
/// </summary>
/// <exception cref="InvalidOperationException"> if the thread has already been started </exception>
public virtual void StartUpdateThread(long intervalMillis, string threadName)
{
EnsureOpen();
if (updateThread != null && updateThread.IsAlive)
throw new InvalidOperationException("cannot start an update thread when one is running, must first call 'stopUpdateThread()'");
threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName;
updateThread = new ReplicationThread(intervalMillis, threadName, DoUpdate, HandleUpdateException, updateLock);
updateThread.Start();
// we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side
if (Debugging.AssertsEnabled) Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?");
}
/// <summary>
/// Stop the update thread. If the update thread is not running, silently does
/// nothing. This method returns after the update thread has stopped.
/// </summary>
public virtual void StopUpdateThread()
{
// this will trigger the thread to terminate if it awaits the lock.
// otherwise, if it's in the middle of replication, we wait for it to
// stop.
if (updateThread != null)
updateThread.Stop();
updateThread = null;
}
/// <summary>
/// Returns true if the update thread is alive. The update thread is alive if
/// it has been <see cref="StartUpdateThread"/> and not
/// <see cref="StopUpdateThread"/>, as well as didn't hit an error which
/// caused it to terminate (i.e. <see cref="HandleUpdateException"/>
/// threw the exception further).
/// </summary>
public virtual bool IsUpdateThreadAlive => updateThread != null && updateThread.IsAlive;
public override string ToString()
{
if (updateThread == null)
return "ReplicationClient";
return string.Format("ReplicationClient ({0})", updateThread.Name);
}
/// <summary>
/// Executes the update operation immediately, regardless if an update thread
/// is running or not.
/// </summary>
/// <exception cref="IOException"></exception>
public virtual void UpdateNow()
{
EnsureOpen();
if (updateThread != null)
{
//NOTE: We have a worker running, we use that to perform the work instead by requesting it to run
// it's cycle immidiately.
updateThread.ExecuteImmediately();
return;
}
//NOTE: We don't have a worker running, so we just do the work.
updateLock.Lock();
try
{
DoUpdate();
}
finally
{
updateLock.Unlock();
}
}
/// <summary>
/// Gets or sets the <see cref="Util.InfoStream"/> to use for logging messages.
/// </summary>
public virtual InfoStream InfoStream
{
get => infoStream;
set => infoStream = value ?? InfoStream.NO_OUTPUT;
}
}
/// <summary>Handler for revisions obtained by the client.</summary>
//Note: LUCENENET specific denesting of interface
public interface IReplicationHandler
{
/// <summary>Returns the current revision files held by the handler.</summary>
string CurrentVersion { get; }
/// <summary>Returns the current revision version held by the handler.</summary>
IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles { get; }
/// <summary>
/// Called when a new revision was obtained and is available (i.e. all needed files were successfully copied).
/// </summary>
/// <param name="version">The version of the <see cref="IRevision"/> that was copied</param>
/// <param name="revisionFiles"> The files contained by this <see cref="IRevision"/></param>
/// <param name="copiedFiles">The files that were actually copied</param>
/// <param name="sourceDirectory">A mapping from a source of files to the <see cref="Directory"/> they were copied into</param>
/// <exception cref="IOException"/>
void RevisionReady(string version,
IDictionary<string, IList<RevisionFile>> revisionFiles,
IDictionary<string, IList<string>> copiedFiles,
IDictionary<string, Directory> sourceDirectory);
}
/// <summary>
/// Resolves a session and source into a <see cref="Directory"/> to use for copying
/// the session files to.
/// </summary>
//Note: LUCENENET specific denesting of interface
public interface ISourceDirectoryFactory
{
/// <summary>
/// Returns the <see cref="Directory"/> to use for the given session and source.
/// Implementations may e.g. return different directories for different
/// sessions, or the same directory for all sessions. In that case, it is
/// advised to clean the directory before it is used for a new session.
/// </summary>
/// <exception cref="IOException"></exception>
/// <seealso cref="CleanupSession(string)"/>
Directory GetDirectory(string sessionId, string source); //throws IOException;
/// <summary>
/// Called to denote that the replication actions for this session were finished and the directory is no longer needed.
/// </summary>
/// <exception cref="IOException"></exception>
void CleanupSession(string sessionId);
}
}