blob: 448777dc973e9cbcddb0357370b716c4264cf08b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Transport
{
/// <summary>
/// This class make sure that the connection is still alive,
/// by monitoring the reception of commands from the peer of
/// the transport.
/// </summary>
public class InactivityMonitor : TransportFilter
{
private Atomic<bool> monitorStarted = new Atomic<bool>(false);
private Atomic<bool> commandSent = new Atomic<bool>(false);
private Atomic<bool> commandReceived = new Atomic<bool>(false);
private Atomic<bool> failed = new Atomic<bool>(false);
private Atomic<bool> inRead = new Atomic<bool>(false);
private Atomic<bool> inWrite = new Atomic<bool>(false);
private CompositeTaskRunner asyncTasks;
private AsyncSignalReadErrorkTask asyncErrorTask;
private AsyncWriteTask asyncWriteTask;
private Mutex monitor = new Mutex();
private Timer readCheckTimer;
private Timer writeCheckTimer;
private DateTime lastReadCheckTime;
//private WriteChecker writeChecker;
//private ReadChecker readChecker;
private long readCheckTime;
public long ReadCheckTime
{
get { return this.readCheckTime; }
set { this.readCheckTime = value; }
}
private long writeCheckTime;
public long WriteCheckTime
{
get { return this.writeCheckTime; }
set { this.writeCheckTime = value; }
}
private long initialDelayTime;
public long InitialDelayTime
{
get { return this.initialDelayTime; }
set { this.initialDelayTime = value; }
}
private Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
public bool KeepAliveResponseRequired
{
get { return this.keepAliveResponseRequired.Value; }
set { keepAliveResponseRequired.Value = value; }
}
// Local and remote Wire Format Information
private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
/// <summary>
/// Constructor or the Inactivity Monitor
/// </summary>
/// <param name="next"></param>
public InactivityMonitor(ITransport next)
: base(next)
{
Tracer.Debug("Creating Inactivity Monitor");
}
~InactivityMonitor()
{
Dispose(false);
}
protected override void Dispose(bool disposing)
{
if(disposing)
{
// get rid of unmanaged stuff
}
StopMonitorThreads();
base.Dispose(disposing);
}
#region WriteCheck Related
/// <summary>
/// Check the write to the broker
/// </summary>
public void WriteCheck(object state)
{
if(this.inWrite.Value || this.failed.Value)
{
return;
}
if(!commandSent.Value)
{
Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
Tracer.Debug("Message sent since last write check. Resetting flag");
}
commandSent.Value = false;
}
#endregion
#region ReadCheck Related
public void ReadCheck(object state)
{
DateTime now = DateTime.Now;
TimeSpan elapsed = now - this.lastReadCheckTime;
if(!AllowReadCheck(elapsed))
{
return;
}
this.lastReadCheckTime = now;
if(this.inRead.Value || this.failed.Value)
{
Tracer.Debug("A receive is in progress or already failed.");
return;
}
if(!commandReceived.Value)
{
Tracer.Debug("No message received since last read check! Sending an InactivityException!");
this.asyncErrorTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
commandReceived.Value = false;
}
}
/// <summary>
/// Checks if we should allow the read check(if less than 90% of the read
/// check time elapsed then we dont do the readcheck
/// </summary>
/// <param name="elapsed"></param>
/// <returns></returns>
public bool AllowReadCheck(TimeSpan elapsed)
{
return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
}
#endregion
public override void Stop()
{
StopMonitorThreads();
next.Stop();
}
protected override void OnCommand(ITransport sender, Command command)
{
commandReceived.Value = true;
inRead.Value = true;
try
{
if(command is KeepAliveInfo)
{
KeepAliveInfo info = command as KeepAliveInfo;
if(info.ResponseRequired)
{
try
{
info.ResponseRequired = false;
Oneway(info);
}
catch(IOException ex)
{
OnException(this, ex);
}
}
}
else if(command is WireFormatInfo)
{
lock(monitor)
{
remoteWireFormatInfo = command as WireFormatInfo;
try
{
StartMonitorThreads();
}
catch(IOException ex)
{
OnException(this, ex);
}
}
}
base.OnCommand(sender, command);
}
finally
{
inRead.Value = false;
}
}
public override void Oneway(Command command)
{
// Disable inactivity monitoring while processing a command.
//synchronize this method - its not synchronized
//further down the transport stack and gets called by more
//than one thread by this class
lock(inWrite)
{
inWrite.Value = true;
try
{
if(failed.Value)
{
throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
}
if(command is WireFormatInfo)
{
lock(monitor)
{
localWireFormatInfo = command as WireFormatInfo;
StartMonitorThreads();
}
}
next.Oneway(command);
}
finally
{
commandSent.Value = true;
inWrite.Value = false;
}
}
}
protected override void OnException(ITransport sender, Exception command)
{
if(failed.CompareAndSet(false, true))
{
Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
StopMonitorThreads();
base.OnException(sender, command);
}
}
private void StartMonitorThreads()
{
lock(monitor)
{
if(monitorStarted.Value)
{
return;
}
if(localWireFormatInfo == null)
{
return;
}
if(remoteWireFormatInfo == null)
{
return;
}
readCheckTime =
Math.Min(
localWireFormatInfo.MaxInactivityDuration,
remoteWireFormatInfo.MaxInactivityDuration);
initialDelayTime =
Math.Min(
localWireFormatInfo.MaxInactivityDurationInitialDelay,
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
this.asyncTasks = new CompositeTaskRunner();
this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
this.asyncWriteTask = new AsyncWriteTask(this);
this.asyncTasks.AddTask(this.asyncErrorTask);
this.asyncTasks.AddTask(this.asyncWriteTask);
if(readCheckTime > 0)
{
monitorStarted.Value = true;
writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
writeCheckTimer = new Timer(
new TimerCallback(WriteCheck),
null,
initialDelayTime,
writeCheckTime
);
readCheckTimer = new Timer(
new TimerCallback(ReadCheck),
null,
initialDelayTime,
readCheckTime
);
}
}
}
private void StopMonitorThreads()
{
lock(monitor)
{
if(monitorStarted.CompareAndSet(true, false))
{
AutoResetEvent shutdownEvent = new AutoResetEvent(false);
// Attempt to wait for the Timers to shutdown, but don't wait
// forever, if they don't shutdown after two seconds, just quit.
this.readCheckTimer.Dispose(shutdownEvent);
shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
this.writeCheckTimer.Dispose(shutdownEvent);
shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
this.asyncTasks.Shutdown();
this.asyncTasks = null;
this.asyncWriteTask = null;
this.asyncErrorTask = null;
}
}
}
#region Async Tasks
// Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
class AsyncSignalReadErrorkTask : CompositeTask
{
private InactivityMonitor parent;
private Uri remote;
private Atomic<bool> pending = new Atomic<bool>(false);
public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
{
this.parent = parent;
this.remote = remote;
}
public bool IsPending
{
get { return this.pending.Value; }
set { this.pending.Value = value; }
}
public bool Iterate()
{
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
IOException ex = new IOException("Channel was inactive for too long: " + remote);
this.parent.OnException(parent, ex);
}
return this.pending.Value;
}
}
// Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
class AsyncWriteTask : CompositeTask
{
private InactivityMonitor parent;
private Atomic<bool> pending = new Atomic<bool>(false);
public AsyncWriteTask(InactivityMonitor parent)
{
this.parent = parent;
}
public bool IsPending
{
get { return this.pending.Value; }
set { this.pending.Value = value; }
}
public bool Iterate()
{
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
KeepAliveInfo info = new KeepAliveInfo();
info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
this.parent.Oneway(info);
}
catch(IOException e)
{
this.parent.OnException(parent, e);
}
}
return this.pending.Value;
}
}
#endregion
}
}