blob: 6a06852eaa5051ee90939d5429bbd354891a2ab9 [file] [log] [blame]
using J2N.Threading.Atomic;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
namespace Lucene.Net.Replicator
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// <summary>
/// A <see cref="IReplicator"/> implementation for use by the side that publishes
/// <see cref="IRevision"/>s, as well for clients to <see cref="CheckForUpdate"/>
/// check for updates}. When a client needs to be updated, it is returned a
/// <see cref="SessionToken"/> through which it can
/// <see cref="ObtainFile"/> the files of that
/// revision. As long as a revision is being replicated, this replicator
/// guarantees that it will not be <see cref="IRevision.Release"/>.
/// <para/>
/// Replication sessions expire by default after
/// <seea cref="DEFAULT_SESSION_EXPIRATION_THRESHOLD"/>, and the threshold can be
/// configured through <see cref="ExpirationThreshold"/>.
/// </summary>
/// <remarks>
/// @lucene.experimental
/// </remarks>
public class LocalReplicator : IReplicator
{
private class RefCountedRevision
{
private readonly AtomicInt32 refCount = new AtomicInt32(1);
public IRevision Revision { get; private set; }
public RefCountedRevision(IRevision revision)
{
Revision = revision;
}
/// <summary/>
/// <exception cref="InvalidOperationException"></exception>
public virtual void DecRef()
{
if (refCount <= 0)
{
throw new InvalidOperationException("this revision is already released");
}
var rc = refCount.DecrementAndGet();
if (rc == 0)
{
bool success = false;
try
{
Revision.Release();
success = true;
}
finally
{
if (!success)
{
// Put reference back on failure
refCount.IncrementAndGet();
}
}
}
else if (rc < 0)
{
throw new InvalidOperationException(string.Format("too many decRef calls: refCount is {0} after decrement", rc));
}
}
public virtual void IncRef()
{
refCount.IncrementAndGet();
}
}
private class ReplicationSession
{
public SessionToken Session { get; private set; }
public RefCountedRevision Revision { get; private set; }
private long lastAccessTime;
public ReplicationSession(SessionToken session, RefCountedRevision revision)
{
Session = session;
Revision = revision;
lastAccessTime = Stopwatch.GetTimestamp();
}
public virtual bool IsExpired(long expirationThreshold)
{
return lastAccessTime < Stopwatch.GetTimestamp() - expirationThreshold * Stopwatch.Frequency / 1000; // LUCENENET TODO: CurrentTimeMilliseconds()
}
public virtual void MarkAccessed()
{
lastAccessTime = Stopwatch.GetTimestamp(); // LUCENENET TODO: CurrentTimeMilliseconds()
}
}
/// <summary>Threshold for expiring inactive sessions. Defaults to 30 minutes.</summary>
public const long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30;
private long expirationThreshold = DEFAULT_SESSION_EXPIRATION_THRESHOLD;
private readonly object padlock = new object();
private volatile RefCountedRevision currentRevision;
private volatile bool disposed = false;
private readonly AtomicInt32 sessionToken = new AtomicInt32(0);
private readonly IDictionary<string, ReplicationSession> sessions = new Dictionary<string, ReplicationSession>();
/// <exception cref="InvalidOperationException"></exception>
private void CheckExpiredSessions()
{
// .NET NOTE: .ToArray() so we don't modify a collection we are enumerating...
// I am wondering if it would be overall more practical to switch to a concurrent dictionary...
foreach (ReplicationSession token in sessions.Values.Where(token => token.IsExpired(ExpirationThreshold)).ToArray())
{
ReleaseSession(token.Session.Id);
}
}
/// <exception cref="InvalidOperationException"></exception>
private void ReleaseSession(string sessionId)
{
ReplicationSession session;
// if we're called concurrently by close() and release(), could be that one
// thread beats the other to release the session.
if (sessions.TryGetValue(sessionId, out session))
{
sessions.Remove(sessionId);
session.Revision.DecRef();
}
}
/// <summary>
/// Ensure that replicator is still open, or throw <see cref="ObjectDisposedException"/> otherwise.
/// </summary>
/// <exception cref="ObjectDisposedException">This replicator has already been disposed.</exception>
protected void EnsureOpen()
{
lock (padlock)
{
if (!disposed)
return;
throw new ObjectDisposedException("This replicator has already been disposed");
}
}
public virtual SessionToken CheckForUpdate(string currentVersion)
{
lock (padlock)
{
EnsureOpen();
if (currentRevision == null)
return null; // no published revisions yet
if (currentVersion != null && currentRevision.Revision.CompareTo(currentVersion) <= 0)
return null; // currentVersion is newer or equal to latest published revision
// currentVersion is either null or older than latest published revision
currentRevision.IncRef();
string sessionID = sessionToken.IncrementAndGet().ToString();
SessionToken token = new SessionToken(sessionID, currentRevision.Revision);
sessions[sessionID] = new ReplicationSession(token, currentRevision);
return token;
}
}
protected virtual void Dispose(bool disposing)
{
if (disposed || !disposing)
return;
lock (padlock)
{
foreach (ReplicationSession session in sessions.Values)
session.Revision.DecRef();
sessions.Clear();
}
disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Gets or sets the expiration threshold in milliseconds.
/// <para/>
/// If a replication session is inactive this
/// long it is automatically expired, and further attempts to operate within
/// this session will throw a <see cref="SessionExpiredException"/>.
/// </summary>
public virtual long ExpirationThreshold
{
get => expirationThreshold;
set
{
lock (padlock)
{
EnsureOpen();
expirationThreshold = value;
CheckExpiredSessions();
}
}
}
public virtual Stream ObtainFile(string sessionId, string source, string fileName)
{
lock (padlock)
{
EnsureOpen();
ReplicationSession session;
if (sessions.TryGetValue(sessionId, out session) && session != null && session.IsExpired(ExpirationThreshold))
{
ReleaseSession(sessionId);
session = null;
}
// session either previously expired, or we just expired it
if (session == null)
{
throw new SessionExpiredException(string.Format("session ({0}) expired while obtaining file: source={1} file={2}", sessionId, source, fileName));
}
sessions[sessionId].MarkAccessed();
return session.Revision.Revision.Open(source, fileName);
}
}
public virtual void Publish(IRevision revision)
{
lock (padlock)
{
EnsureOpen();
if (currentRevision != null)
{
int compare = revision.CompareTo(currentRevision.Revision);
if (compare == 0)
{
// same revision published again, ignore but release it
revision.Release();
return;
}
if (compare < 0)
{
revision.Release();
throw new ArgumentException(string.Format("Cannot publish an older revision: rev={0} current={1}", revision, currentRevision), "revision");
}
}
RefCountedRevision oldRevision = currentRevision;
currentRevision = new RefCountedRevision(revision);
if (oldRevision != null)
oldRevision.DecRef();
CheckExpiredSessions();
}
}
/// <exception cref="InvalidOperationException"></exception>
public virtual void Release(string sessionId)
{
lock (padlock)
{
EnsureOpen();
ReleaseSession(sessionId);
}
}
}
}