blob: 2bb759217f27330167b4b66fbd3cdac624e3be8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Threads;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Stomp.Protocol;
using Apache.NMS.Util;
namespace Apache.NMS.Stomp.Transport
{
/// <summary>
/// This class make sure that the connection is still alive,
/// by monitoring the reception of commands from the peer of
/// the transport.
/// </summary>
public class InactivityMonitor : TransportFilter
{
private readonly Atomic<bool> monitorStarted = new Atomic<bool>(false);
private readonly Atomic<bool> commandSent = new Atomic<bool>(false);
private readonly Atomic<bool> commandReceived = new Atomic<bool>(false);
private readonly Atomic<bool> failed = new Atomic<bool>(false);
private readonly Atomic<bool> inRead = new Atomic<bool>(false);
private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
private CompositeTaskRunner asyncTasks;
private AsyncSignalReadErrorkTask asyncErrorTask;
private AsyncWriteTask asyncWriteTask;
private readonly Mutex monitor = new Mutex();
private static int id = 0;
private readonly int instanceId = 0;
private bool disposing = false;
private Timer connectionCheckTimer;
private DateTime lastReadCheckTime;
private long readCheckTime = 30000;
public long ReadCheckTime
{
get { return this.readCheckTime; }
set { this.readCheckTime = value; }
}
private long writeCheckTime = 10000;
public long WriteCheckTime
{
get { return this.writeCheckTime; }
set { this.writeCheckTime = value; }
}
private long initialDelayTime = 0;
public long InitialDelayTime
{
get { return this.initialDelayTime; }
set { this.initialDelayTime = value; }
}
// Local and remote Wire Format Information
private StompWireFormat localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
/// <summary>
/// Constructor or the Inactivity Monitor
/// </summary>
/// <param name="next"></param>
/// <param name="wireFormat"></param>
public InactivityMonitor(ITransport next, StompWireFormat wireFormat)
: base(next)
{
this.instanceId = ++id;
this.localWireFormatInfo = wireFormat;
Tracer.DebugFormat("Creating Inactivity Monitor: {0}", instanceId);
}
~InactivityMonitor()
{
Dispose(false);
}
protected override void Dispose(bool disposing)
{
if(disposing)
{
// get rid of unmanaged stuff
}
lock(monitor)
{
this.localWireFormatInfo = null;
this.remoteWireFormatInfo = null;
this.disposing = true;
StopMonitorThreads();
}
base.Dispose(disposing);
}
public void CheckConnection(object state)
{
Tracer.DebugFormat("CheckConnection: Timer Elapsed at {0}", DateTime.Now.ToLocalTime());
// First see if we have written or can write.
WriteCheck();
// Now check is we've read anything, if not then we send
// a new KeepAlive with response required.
ReadCheck();
}
#region WriteCheck Related
/// <summary>
/// Check the write to the broker
/// </summary>
public void WriteCheck()
{
if(this.inWrite.Value || this.failed.Value)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
return;
}
if(!commandSent.Value)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
}
commandSent.Value = false;
}
#endregion
#region ReadCheck Related
public void ReadCheck()
{
DateTime now = DateTime.Now;
TimeSpan elapsed = now - this.lastReadCheckTime;
if(!AllowReadCheck(elapsed))
{
Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
return;
}
this.lastReadCheckTime = now;
if(this.inRead.Value || this.failed.Value || this.asyncErrorTask == null)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
return;
}
if(!commandReceived.Value)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
this.asyncErrorTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
commandReceived.Value = false;
}
}
/// <summary>
/// Checks if we should allow the read check(if less than 90% of the read
/// check time elapsed then we dont do the readcheck
/// </summary>
/// <param name="elapsed"></param>
/// <returns></returns>
public bool AllowReadCheck(TimeSpan elapsed)
{
return (elapsed.TotalMilliseconds > readCheckTime);
}
#endregion
public override void Stop()
{
StopMonitorThreads();
next.Stop();
}
protected override void OnCommand(ITransport sender, Command command)
{
commandReceived.Value = true;
inRead.Value = true;
try
{
if(command.IsWireFormatInfo)
{
lock(monitor)
{
remoteWireFormatInfo = command as WireFormatInfo;
try
{
StartMonitorThreads();
}
catch(IOException ex)
{
OnException(this, ex);
}
}
}
else if(command.IsKeepAliveInfo)
{
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: New Keep Alive Received at -> " +
DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
"." + DateTime.Now.Millisecond, instanceId);
}
}
base.OnCommand(sender, command);
}
finally
{
inRead.Value = false;
}
}
public override void Oneway(Command command)
{
// Disable inactivity monitoring while processing a command.
//synchronize this method - its not synchronized
//further down the transport stack and gets called by more
//than one thread by this class
lock(inWrite)
{
inWrite.Value = true;
try
{
if(failed.Value)
{
throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
}
if(command.IsConnectionInfo)
{
lock(monitor)
{
StartMonitorThreads();
}
}
next.Oneway(command);
}
finally
{
commandSent.Value = true;
inWrite.Value = false;
}
}
}
protected override void OnException(ITransport sender, Exception command)
{
if(failed.CompareAndSet(false, true) && !this.disposing)
{
Tracer.DebugFormat("Exception received in the Inactivity Monitor: {0}", command.Message);
StopMonitorThreads();
base.OnException(sender, command);
}
}
private void StartMonitorThreads()
{
lock(monitor)
{
if(this.IsDisposed || this.disposing)
{
return;
}
if(monitorStarted.Value)
{
return;
}
if(localWireFormatInfo == null)
{
return;
}
if(remoteWireFormatInfo == null)
{
return;
}
if(localWireFormatInfo.MaxInactivityDuration != 0 &&
remoteWireFormatInfo.WriteCheckInterval != 0)
{
readCheckTime =
Math.Max(
localWireFormatInfo.ReadCheckInterval,
remoteWireFormatInfo.WriteCheckInterval);
this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
}
if(localWireFormatInfo.MaxInactivityDuration != 0)
{
if(remoteWireFormatInfo.Version > 1.0)
{
writeCheckTime =
Math.Max(localWireFormatInfo.WriteCheckInterval,
remoteWireFormatInfo.ReadCheckInterval);
}
else
{
writeCheckTime = localWireFormatInfo.WriteCheckInterval;
}
this.asyncWriteTask = new AsyncWriteTask(this);
}
initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay > 0 ?
localWireFormatInfo.MaxInactivityDurationInitialDelay : writeCheckTime;
Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
instanceId, readCheckTime );
Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
instanceId, initialDelayTime );
this.asyncTasks = new CompositeTaskRunner();
if(this.asyncErrorTask != null)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: Adding the Async Read Check Task to the Runner.", instanceId);
this.asyncTasks.AddTask(this.asyncErrorTask);
}
if(this.asyncWriteTask != null)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
instanceId, writeCheckTime );
this.asyncTasks.AddTask(this.asyncWriteTask);
}
if(this.asyncErrorTask != null || this.asyncWriteTask != null)
{
Tracer.DebugFormat("InactivityMonitor[{0}]: Starting the Monitor Timer.", instanceId);
monitorStarted.Value = true;
this.connectionCheckTimer = new Timer(
new TimerCallback(CheckConnection),
null,
initialDelayTime,
writeCheckTime
);
}
}
}
private void StopMonitorThreads()
{
lock(monitor)
{
if(monitorStarted.CompareAndSet(true, false))
{
// Attempt to wait for the Timer to shutdown, but don't wait
// forever, if they don't shutdown after two seconds, just quit.
ThreadUtil.DisposeTimer(connectionCheckTimer, 2000);
this.asyncTasks.Shutdown();
this.asyncTasks = null;
this.asyncWriteTask = null;
this.asyncErrorTask = null;
}
}
}
#region Async Tasks
// Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
class AsyncSignalReadErrorkTask : CompositeTask
{
private readonly InactivityMonitor parent;
private readonly Uri remote;
private readonly Atomic<bool> pending = new Atomic<bool>(false);
public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
{
this.parent = parent;
this.remote = remote;
}
public bool IsPending
{
get { return this.pending.Value; }
set { this.pending.Value = value; }
}
public bool Iterate()
{
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
IOException ex = new IOException("Channel was inactive for too long: " + remote);
this.parent.OnException(parent, ex);
}
return this.pending.Value;
}
}
// Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
class AsyncWriteTask : CompositeTask
{
private readonly InactivityMonitor parent;
private readonly Atomic<bool> pending = new Atomic<bool>(false);
public AsyncWriteTask(InactivityMonitor parent)
{
this.parent = parent;
}
public bool IsPending
{
get { return this.pending.Value; }
set { this.pending.Value = value; }
}
public bool Iterate()
{
Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
parent.instanceId);
KeepAliveInfo info = new KeepAliveInfo();
this.parent.next.Oneway(info);
}
catch(IOException e)
{
this.parent.OnException(parent, e);
}
}
return this.pending.Value;
}
}
#endregion
}
}