| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using System.Collections; |
| using System.IO; |
| using System.Reflection; |
| using System.Threading; |
| using log4net; |
| using Apache.Qpid.Client.Failover; |
| using Apache.Qpid.Client.Protocol; |
| using Apache.Qpid.Client.Qms; |
| using Apache.Qpid.Client.State; |
| using Apache.Qpid.Client.Transport; |
| using Apache.Qpid.Client.Transport.Socket.Blocking; |
| using Apache.Qpid.Collections; |
| using Apache.Qpid.Framing; |
| using Apache.Qpid.Messaging; |
| |
| namespace Apache.Qpid.Client |
| { |
| public class AMQConnection : Closeable, IConnection |
| { |
| private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection)); |
| |
| IConnectionInfo _connectionInfo; |
| private int _nextChannelId = 0; |
| |
| // _Connected should be refactored with a suitable wait object. |
| private bool _connected; |
| |
| Thread _heartBeatThread; |
| HeartBeatThread _heartBeatRunner; |
| |
| // The last error code that occured on the connection. Used to return the correct exception to the client |
| private AMQException _lastAMQException = null; |
| |
| /** |
| * This is the "root" mutex that must be held when doing anything that could be impacted by failover. |
| * This must be held by any child objects of this connection such as the session, producers and consumers. |
| */ |
| private readonly Object _failoverMutex = new Object(); |
| public object FailoverMutex |
| { |
| get { return _failoverMutex; } |
| } |
| |
| /** |
| * Policy dictating how to failover |
| */ |
| private FailoverPolicy _failoverPolicy; |
| |
| internal bool IsFailoverAllowed |
| { |
| get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } |
| } |
| |
| /// <summary> |
| /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels |
| /// per session and we must prevent the client from opening too many. Zero means unlimited. |
| /// </summary> |
| private ushort _maximumChannelCount; |
| |
| /// <summary> |
| /// The maximum size of frame supported by the server |
| /// </summary> |
| private uint _maximumFrameSize; |
| |
| private AMQStateManager _stateManager; |
| |
| private AMQProtocolSession _protocolSession; |
| public AMQProtocolSession ProtocolSession { get { return _protocolSession; } } |
| |
| /// <summary> |
| /// Maps from session id (Integer) to AmqChannel instance |
| /// </summary> |
| private readonly IDictionary _sessions = new LinkedHashtable(); |
| |
| private ExceptionListenerDelegate _exceptionListener; |
| |
| private IConnectionListener _connectionListener; |
| |
| private ITransport _transport; |
| public ITransport Transport { get { return _transport; } } |
| |
| /// <summary> |
| /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for |
| /// message publication. |
| /// </summary> |
| private bool _started; |
| |
| private AMQProtocolListener _protocolListener; |
| public AMQProtocolListener ProtocolListener { get { return _protocolListener; } } |
| |
| public IProtocolWriter ProtocolWriter |
| { |
| get { return _transport.ProtocolWriter; } |
| } |
| |
| ProtocolWriter _protocolWriter; |
| |
| public ProtocolWriter ConvenientProtocolWriter |
| { |
| get { return _protocolWriter; } |
| } |
| |
| public AMQConnection(IConnectionInfo connectionInfo) |
| { |
| if (connectionInfo == null) |
| { |
| throw new ArgumentException("ConnectionInfo must be specified"); |
| } |
| _log.Debug("ConnectionInfo: " + connectionInfo); |
| _connectionInfo = connectionInfo; |
| _log.Debug("password = " + _connectionInfo.Password); |
| _failoverPolicy = new FailoverPolicy(connectionInfo); |
| |
| // We are not currently connected. |
| _connected = false; |
| |
| Exception lastException = null; |
| do |
| { |
| try |
| { |
| IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo(); |
| _log.Debug("Connecting to " + brokerInfo); |
| MakeBrokerConnection(brokerInfo); |
| break; |
| } |
| catch (Exception e) |
| { |
| lastException = e; |
| _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); |
| // XXX: Should perhaps break out of the do/while here if not a SocketException... |
| } |
| } while (!_connected && _failoverPolicy.FailoverAllowed()); |
| |
| _log.Debug("Are we connected:" + _connected); |
| |
| if (!_connected) |
| { |
| if ( lastException is AMQException ) |
| { |
| throw lastException; |
| } |
| else |
| { |
| throw new AMQConnectionException("Unable to connect", lastException); |
| } |
| } |
| |
| } |
| |
| /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) |
| { |
| //Assembly assembly = Assembly.LoadFrom(assemblyName); |
| Assembly assembly = Assembly.Load(assemblyName); |
| |
| foreach (Type type in assembly.GetTypes()) |
| { |
| _log.Debug(String.Format("type = {0}", type)); |
| } |
| |
| Type transport = assembly.GetType(transportType); |
| |
| if (transport == null) |
| { |
| throw new ArgumentException( |
| String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName)); |
| |
| } |
| |
| _log.Debug("transport = " + transport); |
| _log.Debug("ctors = " + transport.GetConstructors()); |
| |
| ConstructorInfo info = transport.GetConstructors()[0]; |
| ITransport result = (ITransport)info.Invoke(new object[] { host, port, this }); |
| |
| _log.Debug("transport = " + result); |
| |
| return result; |
| }*/ |
| |
| public void Disconnect() |
| { |
| _transport.Close(); |
| } |
| |
| #region IConnection Members |
| |
| public string ClientID |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _connectionInfo.ClientName; |
| } |
| set |
| { |
| CheckNotClosed(); |
| _connectionInfo.ClientName = value; |
| } |
| } |
| |
| public override void Close() |
| { |
| lock (FailoverMutex) |
| { |
| // atomically set to closed and check the _previous value was NOT CLOSED |
| if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED) |
| { |
| try |
| { |
| CloseAllSessions(null); |
| CloseConnection(); |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Error closing connection: " + e); |
| } |
| } |
| } |
| } |
| |
| private void CloseConnection() |
| { |
| _stateManager.ChangeState(AMQState.CONNECTION_CLOSING); |
| |
| AMQFrame frame = ConnectionCloseBody.CreateAMQFrame( |
| 0, 200, "Qpid.NET client is closing the connection.", 0, 0); |
| |
| ProtocolWriter.Write(frame); |
| |
| _log.Debug("Blocking for connection close ok frame"); |
| |
| Disconnect(); |
| } |
| |
| class CreateChannelFailoverSupport : FailoverSupport |
| { |
| private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport)); |
| |
| private bool _transacted; |
| private AcknowledgeMode _acknowledgeMode; |
| int _prefetchHigh; |
| int _prefetchLow; |
| AMQConnection _connection; |
| |
| public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) |
| { |
| _connection = connection; |
| _transacted = transacted; |
| _acknowledgeMode = acknowledgeMode; |
| _prefetchHigh = prefetchHigh; |
| _prefetchLow = prefetchLow; |
| } |
| |
| protected override object operation() |
| { |
| ushort channelId = _connection.NextChannelId(); |
| |
| if (_log.IsDebugEnabled) |
| { |
| _log.Debug("Write channel open frame for channel id " + channelId); |
| } |
| |
| // We must create the channel and register it before actually sending the frame to the server to |
| // open it, so that there is no window where we could receive data on the channel and not be set |
| // up to handle it appropriately. |
| AmqChannel channel = new AmqChannel(_connection, |
| channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); |
| _connection.ProtocolSession.AddSessionByChannel(channelId, channel); |
| _connection.RegisterSession(channelId, channel); |
| |
| bool success = false; |
| try |
| { |
| _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); |
| success = true; |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Error creating channel: " + e, e); |
| } |
| finally |
| { |
| if (!success) { |
| _connection.ProtocolSession.RemoveSessionByChannel(channelId); |
| _connection.DeregisterSession(channelId); |
| } |
| } |
| |
| if (_connection._started) |
| { |
| channel.Start(); |
| } |
| return channel; |
| } |
| } |
| |
| internal ushort NextChannelId() |
| { |
| return (ushort) Interlocked.Increment(ref _nextChannelId); |
| } |
| |
| public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) |
| { |
| return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); |
| } |
| |
| public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) |
| { |
| return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); |
| } |
| |
| public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) |
| { |
| CheckNotClosed(); |
| if (ChannelLimitReached()) |
| { |
| throw new ChannelLimitReachedException(_maximumChannelCount); |
| } |
| else |
| { |
| CreateChannelFailoverSupport operation = |
| new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); |
| return (IChannel)operation.execute(this); |
| } |
| } |
| |
| public void CloseSession(AmqChannel channel) |
| { |
| // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). |
| _protocolSession.CloseSession(channel); |
| |
| AMQFrame frame = ChannelCloseBody.CreateAMQFrame( |
| channel.ChannelId, 200, "JMS client closing channel", 0, 0); |
| |
| _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId); |
| _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody)); |
| _log.Debug("Received channel close frame"); |
| // When control resumes at this point, a reply will have been received that |
| // indicates the broker has closed the channel successfully |
| } |
| |
| public ExceptionListenerDelegate ExceptionListener |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _exceptionListener; |
| } |
| set |
| { |
| CheckNotClosed(); |
| _exceptionListener = value; |
| } |
| } |
| |
| /// <summary> |
| /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread |
| /// and is not thread safe (which is legal according to the JMS specification). |
| /// @throws JMSException |
| /// </summary> |
| public void Start() |
| { |
| CheckNotClosed(); |
| |
| if (!_started) |
| { |
| foreach (DictionaryEntry lde in _sessions) |
| { |
| AmqChannel s = (AmqChannel)lde.Value; |
| s.Start(); |
| } |
| _started = true; |
| } |
| } |
| |
| public void Stop() |
| { |
| CheckNotClosed(); |
| |
| if (_started) |
| { |
| foreach (DictionaryEntry lde in _sessions) |
| { |
| AmqChannel s = (AmqChannel) lde.Value; |
| s.Stop(); |
| } |
| _started = false; |
| } |
| } |
| |
| public IConnectionListener ConnectionListener |
| { |
| get { return _connectionListener; } |
| set { _connectionListener = value; } |
| } |
| |
| #endregion |
| |
| #region IDisposable Members |
| |
| public void Dispose() |
| { |
| Close(); |
| } |
| |
| #endregion |
| |
| private bool ChannelLimitReached() |
| { |
| return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount; |
| } |
| |
| /// <summary> |
| /// Close all the sessions, either due to normal connection closure or due to an error occurring. |
| /// @param cause if not null, the error that is causing this shutdown |
| /// </summary> |
| private void CloseAllSessions(Exception cause) |
| { |
| _log.Debug("Closing all session in connection " + this); |
| ICollection sessions = new ArrayList(_sessions.Values); |
| foreach (AmqChannel channel in sessions) |
| { |
| _log.Debug("Closing channel " + channel); |
| if (cause != null) |
| { |
| channel.ClosedWithException(cause); |
| } |
| else |
| { |
| try |
| { |
| channel.Close(); |
| } |
| catch (QpidException e) |
| { |
| _log.Error("Error closing channel: " + e); |
| } |
| } |
| } |
| _log.Debug("Done closing all sessions in connection " + this); |
| } |
| |
| public int MaximumChannelCount |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _maximumChannelCount; |
| } |
| } |
| |
| internal void SetMaximumChannelCount(ushort maximumChannelCount) |
| { |
| CheckNotClosed(); |
| _maximumChannelCount = maximumChannelCount; |
| } |
| |
| public uint MaximumFrameSize |
| { |
| get |
| { |
| return _maximumFrameSize; |
| } |
| |
| set |
| { |
| _maximumFrameSize = value; |
| } |
| } |
| |
| public IDictionary Sessions |
| { |
| get |
| { |
| return _sessions; |
| } |
| } |
| |
| public string Host |
| { |
| get |
| { |
| return _failoverPolicy.GetCurrentBrokerInfo().Host; |
| } |
| } |
| |
| public int Port |
| { |
| get |
| { |
| return _failoverPolicy.GetCurrentBrokerInfo().Port; |
| } |
| } |
| |
| public string Username |
| { |
| get |
| { |
| return _connectionInfo.Username; |
| } |
| } |
| |
| public string Password |
| { |
| get |
| { |
| return _connectionInfo.Password; |
| } |
| } |
| |
| public string VirtualHost |
| { |
| get |
| { |
| return _connectionInfo.VirtualHost; |
| } |
| } |
| |
| /// <summary> |
| /// Invoked by the AMQProtocolSession when a protocol session exception has occurred. |
| /// This method sends the exception to a JMS exception listener, if configured, and |
| /// propagates the exception to sessions, which in turn will propagate to consumers. |
| /// This allows synchronous consumers to have exceptions thrown to them. |
| /// </summary> |
| /// <param name="cause">the exception</param> |
| public void ExceptionReceived(Exception cause) |
| { |
| if (_exceptionListener != null) |
| { |
| // Listener expects one of these... |
| QpidException xe; |
| |
| if (cause is QpidException) |
| { |
| xe = (QpidException) cause; |
| } |
| else |
| { |
| xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause); |
| } |
| // in the case of an IOException, MINA has closed the protocol session so we set _closed to true |
| // so that any generic client code that tries to close the connection will not mess up this error |
| // handling sequence |
| if (cause is IOException) |
| { |
| Interlocked.Exchange(ref _closed, CLOSED); |
| } |
| #if __MonoCS__ |
| _exceptionListener(xe); |
| #else |
| _exceptionListener.Invoke(xe); |
| #endif |
| } |
| else |
| { |
| _log.Error("Connection exception: " + cause); |
| } |
| |
| // An undelivered is not fatal to the connections usability. |
| if (!(cause is AMQUndeliveredException)) |
| { |
| Interlocked.Exchange(ref _closed, CLOSED); |
| CloseAllSessions(cause); |
| } |
| else |
| { |
| ; |
| } |
| } |
| |
| internal void RegisterSession(int channelId, AmqChannel channel) |
| { |
| _sessions[channelId] = channel; |
| } |
| |
| internal void DeregisterSession(int channelId) |
| { |
| _sessions.Remove(channelId); |
| } |
| |
| /** |
| * Fire the preFailover event to the registered connection listener (if any) |
| * |
| * @param redirect true if this is the result of a redirect request rather than a connection error |
| * @return true if no listener or listener does not veto change |
| */ |
| public bool FirePreFailover(bool redirect) |
| { |
| bool proceed = true; |
| if (_connectionListener != null) |
| { |
| proceed = _connectionListener.PreFailover(redirect); |
| } |
| return proceed; |
| } |
| |
| /** |
| * Fire the preResubscribe event to the registered connection listener (if any). If the listener |
| * vetoes resubscription then all the sessions are closed. |
| * |
| * @return true if no listener or listener does not veto resubscription. |
| * @throws JMSException |
| */ |
| public bool FirePreResubscribe() |
| { |
| if (_connectionListener != null) |
| { |
| bool resubscribe = _connectionListener.PreResubscribe(); |
| if (!resubscribe) |
| { |
| MarkAllSessionsClosed(); |
| } |
| return resubscribe; |
| } |
| else |
| { |
| return true; |
| } |
| } |
| |
| /** |
| * Marks all sessions and their children as closed without sending any protocol messages. Useful when |
| * you need to mark objects "visible" in userland as closed after failover or other significant event that |
| * impacts the connection. |
| * <p/> |
| * The caller must hold the failover mutex before calling this method. |
| */ |
| private void MarkAllSessionsClosed() |
| { |
| //LinkedList sessionCopy = new LinkedList(_sessions.values()); |
| ArrayList sessionCopy = new ArrayList(_sessions.Values); |
| foreach (AmqChannel session in sessionCopy) |
| { |
| session.MarkClosed(); |
| } |
| _sessions.Clear(); |
| } |
| |
| /** |
| * Fires a failover complete event to the registered connection listener (if any). |
| */ |
| public void FireFailoverComplete() |
| { |
| if (_connectionListener != null) |
| { |
| _connectionListener.FailoverComplete(); |
| } |
| } |
| |
| public bool AttemptReconnection(String host, int port, SslOptions sslConfig) |
| { |
| IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); |
| |
| _failoverPolicy.setBroker(bd); |
| |
| try |
| { |
| MakeBrokerConnection(bd); |
| return true; |
| } |
| catch (Exception e) |
| { |
| _log.Debug("Unable to connect to broker at " + bd, e); |
| AttemptReconnection(); |
| } |
| return false; |
| } |
| |
| private void MakeBrokerConnection(IBrokerInfo brokerDetail) |
| { |
| try |
| { |
| _stateManager = new AMQStateManager(); |
| _protocolListener = new AMQProtocolListener(this, _stateManager); |
| _protocolListener.AddFrameListener(_stateManager); |
| |
| /* |
| // Currently there is only one transport option - BlockingSocket. |
| String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll"; |
| String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; |
| |
| // Load the transport assembly dynamically. |
| _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); |
| */ |
| |
| _transport = new BlockingSocketTransport(); |
| |
| // Connect. |
| _transport.Connect(brokerDetail, this); |
| _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); |
| _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); |
| _protocolListener.ProtocolSession = _protocolSession; |
| |
| // Now start the connection "handshake". |
| _transport.ProtocolWriter.Write(new ProtocolInitiation()); |
| |
| // Blocks until the connection has been opened. |
| _stateManager.AttainState(AMQState.CONNECTION_OPEN); |
| |
| _failoverPolicy.attainedConnection(); |
| |
| // XXX: Again this should be changed to a suitable notify. |
| _connected = true; |
| } |
| catch (AMQException e) |
| { |
| _lastAMQException = e; |
| throw; // rethrow |
| } |
| } |
| |
| public bool AttemptReconnection() |
| { |
| while (_failoverPolicy.FailoverAllowed()) |
| { |
| try |
| { |
| MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo()); |
| return true; |
| } |
| catch (Exception e) |
| { |
| if (!(e is AMQException)) |
| { |
| _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e); |
| } |
| else |
| { |
| _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo()); |
| } |
| } |
| } |
| |
| // Connection unsuccessful. |
| return false; |
| } |
| |
| /** |
| * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling. |
| * The caller must hold the failover mutex before calling this method. |
| */ |
| public void ResubscribeChannels() |
| { |
| ArrayList channels = new ArrayList(_sessions.Values); |
| _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count)); |
| foreach (AmqChannel channel in channels) |
| { |
| _protocolSession.AddSessionByChannel(channel.ChannelId, channel); |
| ReopenChannel( |
| channel.ChannelId, |
| channel.DefaultPrefetchHigh, |
| channel.DefaultPrefetchLow, |
| channel.Transacted |
| ); |
| channel.ReplayOnFailOver(); |
| } |
| } |
| |
| private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) |
| { |
| _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", |
| channelId, prefetchHigh, prefetchLow, transacted)); |
| try |
| { |
| CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); |
| } |
| catch (AMQException e) |
| { |
| _protocolSession.RemoveSessionByChannel(channelId); |
| DeregisterSession(channelId); |
| throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); |
| } |
| } |
| |
| void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) |
| { |
| _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); |
| |
| // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We |
| // know this when we connection using AMQP 0.7 |
| if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7) |
| { |
| // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. |
| _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody)); |
| } |
| |
| if (transacted) |
| { |
| if (_log.IsDebugEnabled) |
| { |
| _log.Debug("Issuing TxSelect for " + channelId); |
| } |
| _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody)); |
| } |
| } |
| |
| public String toURL() |
| { |
| return _connectionInfo.AsUrl(); |
| } |
| |
| class HeartBeatThread |
| { |
| int _heartbeatMillis; |
| IProtocolWriter _protocolWriter; |
| bool _run = true; |
| |
| public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis) |
| { |
| _protocolWriter = protocolWriter; |
| _heartbeatMillis = heartbeatMillis; |
| } |
| |
| public void Run() |
| { |
| while (_run) |
| { |
| Thread.Sleep(_heartbeatMillis); |
| if (!_run) break; |
| _log.Debug("Sending heartbeat"); |
| // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker? |
| _protocolWriter.Write(HeartbeatBody.FRAME); |
| } |
| _log.Debug("Heatbeat thread stopped"); |
| } |
| |
| public void Stop() |
| { |
| _run = false; |
| } |
| } |
| |
| public void StartHeartBeatThread(int heartbeatSeconds) |
| { |
| _log.Debug("Starting new heartbeat thread"); |
| _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000); |
| _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run)); |
| _heartBeatThread.Name = "HeartBeat"; |
| _heartBeatThread.Start(); |
| } |
| |
| public void StopHeartBeatThread() |
| { |
| if (_heartBeatRunner != null) |
| { |
| _log.Debug("Stopping old heartbeat thread"); |
| _heartBeatRunner.Stop(); |
| } |
| } |
| } |
| } |