blob: c51538b70e3d1cc685bbe9d7c9b3848dd44fdaa7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Threading;
using log4net;
using Apache.Qpid.Client.Failover;
using Apache.Qpid.Client.Protocol.Listener;
using Apache.Qpid.Client.State;
using Apache.Qpid.Framing;
namespace Apache.Qpid.Client.Protocol
{
/// <summary>
/// AMQProtocolListener
///
/// <p/>Fail-over state transition rules...
///
/// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order
/// to be able to send errors during failover back to the client application. The session won't be available in the case
/// when failing over due to a Connection.Redirect message from the broker.
///
/// <p><table id="crc"><caption>CRC Card</caption>
/// <tr><th> Responsibilities <th> Collaborations
/// <tr><td> Track fail over state of a connection.
/// <tr><td> Manage method listeners. <td> IAMQMethodListener
/// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler
/// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener
/// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener
/// </table>
///
/// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being
/// triggered, when it should not be.
///
/// </summary>
public class AMQProtocolListener : IProtocolListener
{
/// <summary>Used for debugging.</summary>
private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
/// <summary>
/// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it,
/// the failover process is handed off to this handler.
/// </summary>
private FailoverHandler _failoverHandler;
/// <summary>Tracks the current fail-over state.</summary>
internal FailoverState _failoverState = FailoverState.NOT_STARTED;
internal FailoverState FailoverState
{
get { return _failoverState; }
set { _failoverState = value; }
}
internal ManualResetEvent FailoverLatch;
AMQConnection _connection;
AMQStateManager _stateManager;
public AMQStateManager StateManager
{
get { return _stateManager; }
set { _stateManager = value; }
}
private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
AMQProtocolSession _protocolSession = null;
private readonly Object _lock = new Object();
public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } }
public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
{
_connection = connection;
_stateManager = stateManager;
_failoverHandler = new FailoverHandler(connection);
}
public void OnMessage(IDataBlock message)
{
// Handle incorrect protocol version.
if (message is ProtocolInitiation)
{
string error = String.Format("Protocol mismatch - {0}", message.ToString());
AMQException e = new AMQProtocolHeaderException(error);
_log.Error("Closing connection because of protocol mismatch", e);
//_protocolSession.CloseProtocolSession();
_stateManager.Error(e);
return;
}
AMQFrame frame = (AMQFrame)message;
if (frame.BodyFrame is AMQMethodBody)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Method frame received: " + frame);
}
AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
try
{
bool wasAnyoneInterested = false;
lock (_frameListeners.SyncRoot)
{
foreach (IAMQMethodListener listener in _frameListeners)
{
wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
{
throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
}
}
catch (Exception e)
{
foreach (IAMQMethodListener listener in _frameListeners)
{
listener.Error(e);
}
}
}
else if (frame.BodyFrame is ContentHeaderBody)
{
_protocolSession.MessageContentHeaderReceived(frame.Channel,
(ContentHeaderBody)frame.BodyFrame);
}
else if (frame.BodyFrame is ContentBody)
{
_protocolSession.MessageContentBodyReceived(frame.Channel,
(ContentBody)frame.BodyFrame);
}
else if (frame.BodyFrame is HeartbeatBody)
{
_log.Debug("HeartBeat received");
}
}
/// <summary>
/// Receives notification of any IO exceptions on the connection.
///
/// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then
/// all method listeners and the application connection object are notified of the connection failure exception.
///
/// <p/>All other exception types are propagated to all method listeners.
/// </summary>
public void OnException(Exception cause)
{
_log.Warn("public void OnException(Exception cause = " + cause + "): called");
// Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also
// ensures that this exception is fully propagated to all listeners, before another one can be processed.
lock (_lock)
{
if (cause is AMQConnectionClosedException || cause is System.IO.IOException)
{
// Try a fail-over because the connection has failed.
FailoverState failoverState = AttemptFailover();
// Check if the fail-over has failed, in which case notify all method listeners of the exception.
// The application connection object is also notified of the failure of the connection with the exception.
if (failoverState == FailoverState.FAILED)
{
_log.Debug("Fail-over has failed. Notifying all method listeners of the exception.");
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
PropagateExceptionToWaiters(amqe);
_connection.ExceptionReceived(cause);
}
}
// Notify all method listeners of the exception.
else
{
PropagateExceptionToWaiters(cause);
_connection.ExceptionReceived(cause);
}
}
}
/// <summary>
/// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been
/// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress
/// this method allows it to continue to run and will do nothing.
///
/// <p/>This method should only be called when the connection has been remotely closed.
/// </summary>
///
/// <returns>The fail-over state at the end of this attempt.</returns>
private FailoverState AttemptFailover()
{
_log.Debug("private void AttemptFailover(): called");
_log.Debug("_failoverState = " + _failoverState);
// Ensure that the connection stops sending heart beats, if it still is.
_connection.StopHeartBeatThread();
// Check that the connection policy allows fail-over to be attempted.
if (!_connection.IsFailoverAllowed)
{
_log.Debug("Connection does not allowed to failover");
_connection.ExceptionReceived(
new AMQDisconnectedException("Broker closed connection and reconnection is not permitted."));
}
// Check if connection was closed deliberately by the application, in which case no fail-over is attempted.
if (_connection.Closed)
{
return _failoverState;
}
// If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is
// advanced to 'in progress'
if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed)
{
_log.Info("Starting the fail-over process.");
_failoverState = FailoverState.IN_PROGRESS;
StartFailoverThread();
}
return _failoverState;
}
/// <summary>
/// There are two cases where we have other threads potentially blocking for events to be handled by this
/// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
/// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
/// react appropriately.
///
/// <param name="e">the exception to propagate</param>
/// </summary>
public void PropagateExceptionToWaiters(Exception e)
{
// FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
_stateManager.Error(e);
lock ( _lock )
{
foreach ( IAMQMethodListener listener in _frameListeners )
{
listener.Error(e);
}
}
}
public void AddFrameListener(IAMQMethodListener listener)
{
lock ( _lock )
{
_frameListeners.Add(listener);
}
}
public void RemoveFrameListener(IAMQMethodListener listener)
{
if (_log.IsDebugEnabled)
{
_log.Debug("Removing frame listener: " + listener.ToString());
}
lock ( _lock )
{
_frameListeners.Remove(listener);
}
}
public void BlockUntilNotFailingOver()
{
if (FailoverLatch != null)
{
FailoverLatch.WaitOne();
}
}
/// <summary>
/// "Failover" for redirection.
/// </summary>
/// <param name="host"></param>
/// <param name="port"></param>
public void Failover(string host, int port)
{
_failoverHandler.setHost(host);
_failoverHandler.setPort(port);
// see javadoc for FailoverHandler to see rationale for separate thread
StartFailoverThread();
}
private void StartFailoverThread()
{
Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
failoverThread.Name = "Failover";
// Do not inherit daemon-ness from current thread as this can be a daemon
// thread such as a AnonymousIoService thread.
failoverThread.IsBackground = false;
failoverThread.Start();
}
}
}