blob: 872eea81da8ca01cdb918b197c2ad4f705cd818d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
namespace Apache.NMS.ActiveMQ.State
{
/// <summary>
/// Tracks the state of a connection so a newly established transport can be
/// re-initialized to the state that was tracked.
/// </summary>
public class ConnectionStateTracker : CommandVisitorAdapter
{
private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
private bool _trackTransactions;
private bool _restoreSessions = true;
private bool _restoreConsumers = true;
private bool _restoreProducers = true;
private bool _restoreTransaction = true;
private bool _trackMessages = true;
private int _maxCacheSize = 256;
private int currentCacheSize;
private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
protected void RemoveEldestInCache()
{
System.Collections.ICollection ic = messageCacheFIFO;
lock(ic.SyncRoot)
{
while(messageCacheFIFO.Count > MaxCacheSize)
{
messageCache.Remove(messageCacheFIFO.Dequeue());
currentCacheSize = currentCacheSize - 1;
}
}
}
private class RemoveTransactionAction : ThreadSimulator
{
private TransactionInfo info;
private ConnectionStateTracker cst;
public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
{
this.info = info;
this.cst = aCst;
}
public override void Run()
{
ConnectionId connectionId = info.ConnectionId;
ConnectionState cs = cst.connectionStates[connectionId];
cs.removeTransactionState(info.TransactionId);
}
}
/// <summary>
/// </summary>
/// <param name="command"></param>
/// <returns>null if the command is not state tracked.</returns>
public Tracked track(Command command)
{
try
{
return (Tracked) command.visit(this);
}
catch(IOException e)
{
throw e;
}
catch(Exception e)
{
throw new IOException(e.Message);
}
}
public void trackBack(Command command)
{
if(TrackMessages && command != null && command.IsMessage)
{
Message message = (Message) command;
if(message.TransactionId == null)
{
currentCacheSize = currentCacheSize + 1;
}
}
}
public void DoRestore(ITransport transport)
{
// Restore the connections.
foreach(ConnectionState connectionState in connectionStates.Values)
{
transport.Oneway(connectionState.Info);
DoRestoreTempDestinations(transport, connectionState);
if(RestoreSessions)
{
DoRestoreSessions(transport, connectionState);
}
if(RestoreTransaction)
{
DoRestoreTransactions(transport, connectionState);
}
}
//now flush messages
foreach(Message msg in messageCache.Values)
{
transport.Oneway(msg);
}
}
private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
{
AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
foreach(TransactionState transactionState in transactionStates)
{
foreach(Command command in transactionState.Commands)
{
transport.Oneway(command);
}
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="connectionState"></param>
protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
{
// Restore the connection's sessions
foreach(SessionState sessionState in connectionState.SessionStates)
{
transport.Oneway(sessionState.Info);
if(RestoreProducers)
{
DoRestoreProducers(transport, sessionState);
}
if(RestoreConsumers)
{
DoRestoreConsumers(transport, sessionState);
}
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="sessionState"></param>
protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
{
// Restore the session's consumers
foreach(ConsumerState consumerState in sessionState.ConsumerStates)
{
transport.Oneway(consumerState.Info);
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="sessionState"></param>
protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
{
// Restore the session's producers
foreach(ProducerState producerState in sessionState.ProducerStates)
{
transport.Oneway(producerState.Info);
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="connectionState"></param>
protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
{
// Restore the connection's temp destinations.
foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
{
transport.Oneway(destinationInfo);
}
}
public override Response processAddDestination(DestinationInfo info)
{
if(info != null)
{
ConnectionState cs = connectionStates[info.ConnectionId];
if(cs != null && info.Destination.IsTemporary)
{
cs.addTempDestination(info);
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processRemoveDestination(DestinationInfo info)
{
if(info != null)
{
ConnectionState cs = connectionStates[info.ConnectionId];
if(cs != null && info.Destination.IsTemporary)
{
cs.removeTempDestination(info.Destination);
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processAddProducer(ProducerInfo info)
{
if(info != null && info.ProducerId != null)
{
SessionId sessionId = info.ProducerId.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.addProducer(info);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processRemoveProducer(ProducerId id)
{
if(id != null)
{
SessionId sessionId = id.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.removeProducer(id);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processAddConsumer(ConsumerInfo info)
{
if(info != null)
{
SessionId sessionId = info.ConsumerId.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.addConsumer(info);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processRemoveConsumer(ConsumerId id)
{
if(id != null)
{
SessionId sessionId = id.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.removeConsumer(id);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processAddSession(SessionInfo info)
{
if(info != null)
{
ConnectionId connectionId = info.SessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
cs.addSession(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processRemoveSession(SessionId id)
{
if(id != null)
{
ConnectionId connectionId = id.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
cs.removeSession(id);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processAddConnection(ConnectionInfo info)
{
if(info != null)
{
connectionStates.Add(info.ConnectionId, new ConnectionState(info));
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processRemoveConnection(ConnectionId id)
{
if(id != null)
{
connectionStates.Remove(id);
}
return TRACKED_RESPONSE_MARKER;
}
public override Response processMessage(Message send)
{
if(send != null)
{
if(TrackTransactions && send.TransactionId != null)
{
ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[send.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(send);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
else if(TrackMessages)
{
messageCache.Add(send.MessageId, (Message) send.Clone());
RemoveEldestInCache();
}
}
return null;
}
public override Response processMessageAck(MessageAck ack)
{
if(TrackTransactions && ack != null && ack.TransactionId != null)
{
ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[ack.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(ack);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response processBeginTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null && info.TransactionId != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
cs.addTransactionState(info.TransactionId);
TransactionState state = cs[info.TransactionId];
state.addCommand(info);
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response processPrepareTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response processCommitTransactionOnePhase(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response processCommitTransactionTwoPhase(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response processRollbackTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response processEndTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = connectionStates[connectionId];
if(cs != null)
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.addCommand(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public bool RestoreConsumers
{
get
{
return _restoreConsumers;
}
set
{
_restoreConsumers = value;
}
}
public bool RestoreProducers
{
get
{
return _restoreProducers;
}
set
{
_restoreProducers = value;
}
}
public bool RestoreSessions
{
get
{
return _restoreSessions;
}
set
{
_restoreSessions = value;
}
}
public bool TrackTransactions
{
get
{
return _trackTransactions;
}
set
{
_trackTransactions = value;
}
}
public bool RestoreTransaction
{
get
{
return _restoreTransaction;
}
set
{
_restoreTransaction = value;
}
}
public bool TrackMessages
{
get
{
return _trackMessages;
}
set
{
_trackMessages = value;
}
}
public int MaxCacheSize
{
get
{
return _maxCacheSize;
}
set
{
_maxCacheSize = value;
}
}
}
}