/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
using System; | |
using System.Collections.Generic; | |
using Apache.NMS.ActiveMQ.Commands; | |
using Apache.NMS.ActiveMQ.Transport; | |
namespace Apache.NMS.ActiveMQ.State | |
{ | |
/// <summary> | |
/// Tracks the state of a connection so a newly established transport can be | |
/// re-initialized to the state that was tracked. | |
/// </summary> | |
public class ConnectionStateTracker : CommandVisitorAdapter | |
{ | |
private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); | |
protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>(); | |
private bool _trackTransactions; | |
private bool _restoreSessions = true; | |
private bool _restoreConsumers = true; | |
private bool _restoreProducers = true; | |
private bool _restoreTransaction = true; | |
private bool _trackMessages = true; | |
private int _maxCacheSize = 256; | |
private int currentCacheSize; | |
private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>(); | |
private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>(); | |
protected void RemoveEldestInCache() | |
{ | |
System.Collections.ICollection ic = messageCacheFIFO; | |
lock(ic.SyncRoot) | |
{ | |
while(messageCacheFIFO.Count > MaxCacheSize) | |
{ | |
messageCache.Remove(messageCacheFIFO.Dequeue()); | |
currentCacheSize = currentCacheSize - 1; | |
} | |
} | |
} | |
private class RemoveTransactionAction : ThreadSimulator | |
{ | |
private TransactionInfo info; | |
private ConnectionStateTracker cst; | |
public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst) | |
{ | |
this.info = info; | |
this.cst = aCst; | |
} | |
public override void Run() | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
ConnectionState cs = cst.connectionStates[connectionId]; | |
cs.removeTransactionState(info.TransactionId); | |
} | |
} | |
/// <summary> | |
/// </summary> | |
/// <param name="command"></param> | |
/// <returns>null if the command is not state tracked.</returns> | |
public Tracked track(Command command) | |
{ | |
try | |
{ | |
return (Tracked) command.visit(this); | |
} | |
catch(IOException e) | |
{ | |
throw e; | |
} | |
catch(Exception e) | |
{ | |
throw new IOException(e.Message); | |
} | |
} | |
public void trackBack(Command command) | |
{ | |
if(TrackMessages && command != null && command.IsMessage) | |
{ | |
Message message = (Message) command; | |
if(message.TransactionId == null) | |
{ | |
currentCacheSize = currentCacheSize + 1; | |
} | |
} | |
} | |
public void DoRestore(ITransport transport) | |
{ | |
// Restore the connections. | |
foreach(ConnectionState connectionState in connectionStates.Values) | |
{ | |
transport.Oneway(connectionState.Info); | |
DoRestoreTempDestinations(transport, connectionState); | |
if(RestoreSessions) | |
{ | |
DoRestoreSessions(transport, connectionState); | |
} | |
if(RestoreTransaction) | |
{ | |
DoRestoreTransactions(transport, connectionState); | |
} | |
} | |
//now flush messages | |
foreach(Message msg in messageCache.Values) | |
{ | |
transport.Oneway(msg); | |
} | |
} | |
private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState) | |
{ | |
AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates; | |
foreach(TransactionState transactionState in transactionStates) | |
{ | |
foreach(Command command in transactionState.Commands) | |
{ | |
transport.Oneway(command); | |
} | |
} | |
} | |
/// <summary> | |
/// </summary> | |
/// <param name="transport"></param> | |
/// <param name="connectionState"></param> | |
protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState) | |
{ | |
// Restore the connection's sessions | |
foreach(SessionState sessionState in connectionState.SessionStates) | |
{ | |
transport.Oneway(sessionState.Info); | |
if(RestoreProducers) | |
{ | |
DoRestoreProducers(transport, sessionState); | |
} | |
if(RestoreConsumers) | |
{ | |
DoRestoreConsumers(transport, sessionState); | |
} | |
} | |
} | |
/// <summary> | |
/// </summary> | |
/// <param name="transport"></param> | |
/// <param name="sessionState"></param> | |
protected void DoRestoreConsumers(ITransport transport, SessionState sessionState) | |
{ | |
// Restore the session's consumers | |
foreach(ConsumerState consumerState in sessionState.ConsumerStates) | |
{ | |
transport.Oneway(consumerState.Info); | |
} | |
} | |
/// <summary> | |
/// </summary> | |
/// <param name="transport"></param> | |
/// <param name="sessionState"></param> | |
protected void DoRestoreProducers(ITransport transport, SessionState sessionState) | |
{ | |
// Restore the session's producers | |
foreach(ProducerState producerState in sessionState.ProducerStates) | |
{ | |
transport.Oneway(producerState.Info); | |
} | |
} | |
/// <summary> | |
/// </summary> | |
/// <param name="transport"></param> | |
/// <param name="connectionState"></param> | |
protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState) | |
{ | |
// Restore the connection's temp destinations. | |
foreach(DestinationInfo destinationInfo in connectionState.TempDestinations) | |
{ | |
transport.Oneway(destinationInfo); | |
} | |
} | |
public override Response processAddDestination(DestinationInfo info) | |
{ | |
if(info != null) | |
{ | |
ConnectionState cs = connectionStates[info.ConnectionId]; | |
if(cs != null && info.Destination.IsTemporary) | |
{ | |
cs.addTempDestination(info); | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processRemoveDestination(DestinationInfo info) | |
{ | |
if(info != null) | |
{ | |
ConnectionState cs = connectionStates[info.ConnectionId]; | |
if(cs != null && info.Destination.IsTemporary) | |
{ | |
cs.removeTempDestination(info.Destination); | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processAddProducer(ProducerInfo info) | |
{ | |
if(info != null && info.ProducerId != null) | |
{ | |
SessionId sessionId = info.ProducerId.ParentId; | |
if(sessionId != null) | |
{ | |
ConnectionId connectionId = sessionId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
SessionState ss = cs[sessionId]; | |
if(ss != null) | |
{ | |
ss.addProducer(info); | |
} | |
} | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processRemoveProducer(ProducerId id) | |
{ | |
if(id != null) | |
{ | |
SessionId sessionId = id.ParentId; | |
if(sessionId != null) | |
{ | |
ConnectionId connectionId = sessionId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
SessionState ss = cs[sessionId]; | |
if(ss != null) | |
{ | |
ss.removeProducer(id); | |
} | |
} | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processAddConsumer(ConsumerInfo info) | |
{ | |
if(info != null) | |
{ | |
SessionId sessionId = info.ConsumerId.ParentId; | |
if(sessionId != null) | |
{ | |
ConnectionId connectionId = sessionId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
SessionState ss = cs[sessionId]; | |
if(ss != null) | |
{ | |
ss.addConsumer(info); | |
} | |
} | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processRemoveConsumer(ConsumerId id) | |
{ | |
if(id != null) | |
{ | |
SessionId sessionId = id.ParentId; | |
if(sessionId != null) | |
{ | |
ConnectionId connectionId = sessionId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
SessionState ss = cs[sessionId]; | |
if(ss != null) | |
{ | |
ss.removeConsumer(id); | |
} | |
} | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processAddSession(SessionInfo info) | |
{ | |
if(info != null) | |
{ | |
ConnectionId connectionId = info.SessionId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
cs.addSession(info); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processRemoveSession(SessionId id) | |
{ | |
if(id != null) | |
{ | |
ConnectionId connectionId = id.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
cs.removeSession(id); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processAddConnection(ConnectionInfo info) | |
{ | |
if(info != null) | |
{ | |
connectionStates.Add(info.ConnectionId, new ConnectionState(info)); | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processRemoveConnection(ConnectionId id) | |
{ | |
if(id != null) | |
{ | |
connectionStates.Remove(id); | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
public override Response processMessage(Message send) | |
{ | |
if(send != null) | |
{ | |
if(TrackTransactions && send.TransactionId != null) | |
{ | |
ConnectionId connectionId = send.ProducerId.ParentId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[send.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(send); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
else if(TrackMessages) | |
{ | |
messageCache.Add(send.MessageId, (Message) send.Clone()); | |
RemoveEldestInCache(); | |
} | |
} | |
return null; | |
} | |
public override Response processMessageAck(MessageAck ack) | |
{ | |
if(TrackTransactions && ack != null && ack.TransactionId != null) | |
{ | |
ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[ack.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(ack); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
return null; | |
} | |
public override Response processBeginTransaction(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null && info.TransactionId != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
cs.addTransactionState(info.TransactionId); | |
TransactionState state = cs[info.TransactionId]; | |
state.addCommand(info); | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
return null; | |
} | |
public override Response processPrepareTransaction(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[info.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(info); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
return null; | |
} | |
public override Response processCommitTransactionOnePhase(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[info.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(info); | |
return new Tracked(new RemoveTransactionAction(info, this)); | |
} | |
} | |
} | |
} | |
return null; | |
} | |
public override Response processCommitTransactionTwoPhase(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[info.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(info); | |
return new Tracked(new RemoveTransactionAction(info, this)); | |
} | |
} | |
} | |
} | |
return null; | |
} | |
public override Response processRollbackTransaction(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[info.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(info); | |
return new Tracked(new RemoveTransactionAction(info, this)); | |
} | |
} | |
} | |
} | |
return null; | |
} | |
public override Response processEndTransaction(TransactionInfo info) | |
{ | |
if(TrackTransactions && info != null) | |
{ | |
ConnectionId connectionId = info.ConnectionId; | |
if(connectionId != null) | |
{ | |
ConnectionState cs = connectionStates[connectionId]; | |
if(cs != null) | |
{ | |
TransactionState transactionState = cs[info.TransactionId]; | |
if(transactionState != null) | |
{ | |
transactionState.addCommand(info); | |
} | |
} | |
} | |
return TRACKED_RESPONSE_MARKER; | |
} | |
return null; | |
} | |
public bool RestoreConsumers | |
{ | |
get | |
{ | |
return _restoreConsumers; | |
} | |
set | |
{ | |
_restoreConsumers = value; | |
} | |
} | |
public bool RestoreProducers | |
{ | |
get | |
{ | |
return _restoreProducers; | |
} | |
set | |
{ | |
_restoreProducers = value; | |
} | |
} | |
public bool RestoreSessions | |
{ | |
get | |
{ | |
return _restoreSessions; | |
} | |
set | |
{ | |
_restoreSessions = value; | |
} | |
} | |
public bool TrackTransactions | |
{ | |
get | |
{ | |
return _trackTransactions; | |
} | |
set | |
{ | |
_trackTransactions = value; | |
} | |
} | |
public bool RestoreTransaction | |
{ | |
get | |
{ | |
return _restoreTransaction; | |
} | |
set | |
{ | |
_restoreTransaction = value; | |
} | |
} | |
public bool TrackMessages | |
{ | |
get | |
{ | |
return _trackMessages; | |
} | |
set | |
{ | |
_trackMessages = value; | |
} | |
} | |
public int MaxCacheSize | |
{ | |
get | |
{ | |
return _maxCacheSize; | |
} | |
set | |
{ | |
_maxCacheSize = value; | |
} | |
} | |
} | |
} |