blob: 852faefff815c4e283fd737eca10e469af6b0992 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Util;
using System.Collections;
namespace Apache.NMS.ActiveMQ.State
{
/// <summary>
/// Tracks the state of a connection so a newly established transport can be
/// re-initialized to the state that was tracked.
/// </summary>
public class ConnectionStateTracker : CommandVisitorAdapter
{
private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates =
new Dictionary<ConnectionId, ConnectionState>();
private bool isTrackTransactions;
private bool isTrackTransactionProducers = true;
private bool isRestoreSessions = true;
private bool isRestoreConsumers = true;
private bool isRestoreProducers = true;
private bool isRestoreTransaction = true;
private bool isTrackMessages = true;
private int maxCacheSize = 256;
private readonly LRUCache<Object, Command> messageCache = new LRUCache<Object, Command>(256);
private class RemoveTransactionAction : ResponseHandler
{
private readonly TransactionInfo info;
private readonly ConnectionStateTracker cst;
public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
{
this.info = info;
this.cst = aCst;
}
public override void OnResponse()
{
ConnectionState cs;
if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
{
cs.RemoveTransactionState(info.TransactionId);
}
}
}
/// <summary>
/// </summary>
/// <param name="command"></param>
/// <returns>null if the command is not state tracked.</returns>
public Tracked Track(Command command)
{
try
{
return (Tracked) command.Visit(this);
}
catch(IOException)
{
throw;
}
catch(Exception e)
{
throw new IOException(e.Message);
}
}
public void TrackBack(Command command)
{
}
public void DoRestore(ITransport transport)
{
// Restore the connections.
foreach(ConnectionState connectionState in connectionStates.Values)
{
ConnectionInfo info = connectionState.Info;
info.FailoverReconnect = true;
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("conn: " + connectionState.Info.ConnectionId);
}
transport.Oneway(info);
DoRestoreTempDestinations(transport, connectionState);
if(RestoreSessions)
{
DoRestoreSessions(transport, connectionState);
}
if(RestoreTransaction)
{
DoRestoreTransactions(transport, connectionState);
}
}
// Now flush messages
foreach(Command command in messageCache.Values)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Replaying command: " + command);
}
transport.Oneway(command);
}
}
private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
{
AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
List<TransactionInfo> toRollback = new List<TransactionInfo>();
foreach(TransactionState transactionState in transactionStates)
{
// rollback any completed transactions - no way to know if commit got there
// or if reply went missing
if (transactionState.Commands.Count != 0)
{
Command lastCommand = transactionState.Commands[transactionState.Commands.Count - 1];
if (lastCommand.IsTransactionInfo)
{
TransactionInfo transactionInfo = lastCommand as TransactionInfo;
if (transactionInfo.Type == TransactionInfo.COMMIT_ONE_PHASE)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("rolling back potentially completed tx: " +
transactionState.Id);
}
toRollback.Add(transactionInfo);
continue;
}
}
}
// replay the add and remove of short lived producers that may have been
// involved in the transaction
foreach (ProducerState producerState in transactionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx replay producer :" + producerState.Info);
}
transport.Oneway(producerState.Info);
}
foreach (Command command in transactionState.Commands)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx replay: " + command);
}
transport.Oneway(command);
}
foreach (ProducerState producerState in transactionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx remove replayed producer :" + producerState.Info);
}
RemoveInfo producerRemove = new RemoveInfo();
producerRemove.ObjectId = producerState.Info.ProducerId;
transport.Oneway(producerRemove);
}
}
foreach (TransactionInfo command in toRollback)
{
// respond to the outstanding commit
ExceptionResponse response = new ExceptionResponse();
response.Exception = new BrokerError();
response.Exception.Message =
"Transaction completion in doubt due to failover. Forcing rollback of " + command.TransactionId;
response.Exception.ExceptionClass = (new TransactionRolledBackException()).GetType().FullName;
response.CorrelationId = command.CommandId;
transport.Command(transport, response);
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="connectionState"></param>
protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
{
// Restore the connection's sessions
foreach(SessionState sessionState in connectionState.SessionStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Restoring session: " + sessionState.Info.SessionId);
}
transport.Oneway(sessionState.Info);
if(RestoreProducers)
{
DoRestoreProducers(transport, sessionState);
}
if(RestoreConsumers)
{
DoRestoreConsumers(transport, sessionState);
}
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="sessionState"></param>
protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
{
// Restore the session's consumers but possibly in pull only (prefetch 0 state) till
// recovery completes.
ConnectionState connectionState = null;
bool connectionInterruptionProcessingComplete = false;
if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
{
connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
}
// Restore the session's consumers
foreach(ConsumerState consumerState in sessionState.ConsumerStates)
{
ConsumerInfo infoToSend = consumerState.Info;
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
{
if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
{
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
}
}
infoToSend.PrefetchSize = 0;
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
" in pull mode pending recovery, overriding prefetch: " +
consumerState.Info.PrefetchSize);
}
}
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
}
transport.Oneway(infoToSend);
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="sessionState"></param>
protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
{
// Restore the session's producers
foreach(ProducerState producerState in sessionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Restoring producer: " + producerState.Info.ProducerId);
}
transport.Oneway(producerState.Info);
}
}
/// <summary>
/// </summary>
/// <param name="transport"></param>
/// <param name="connectionState"></param>
protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
{
// Restore the connection's temp destinations.
foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
{
transport.Oneway(destinationInfo);
}
}
public override Response ProcessAddDestination(DestinationInfo info)
{
if(info != null && info.Destination.IsTemporary)
{
ConnectionState cs;
if(connectionStates.TryGetValue(info.ConnectionId, out cs))
{
cs.AddTempDestination(info);
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessRemoveDestination(DestinationInfo info)
{
if(info != null && info.Destination.IsTemporary)
{
ConnectionState cs;
if(connectionStates.TryGetValue(info.ConnectionId, out cs))
{
cs.RemoveTempDestination(info.Destination);
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessAddProducer(ProducerInfo info)
{
if(info != null && info.ProducerId != null)
{
SessionId sessionId = info.ProducerId.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs;
if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.AddProducer(info);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessRemoveProducer(ProducerId id)
{
if(id != null)
{
SessionId sessionId = id.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.RemoveProducer(id);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessAddConsumer(ConsumerInfo info)
{
if(info != null)
{
SessionId sessionId = info.ConsumerId.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.AddConsumer(info);
}
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessRemoveConsumer(ConsumerId id)
{
if(id != null)
{
SessionId sessionId = id.ParentId;
if(sessionId != null)
{
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
{
ss.RemoveConsumer(id);
}
cs.RecoveringPullConsumers.Remove(id);
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessAddSession(SessionInfo info)
{
if(info != null)
{
ConnectionId connectionId = info.SessionId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.AddSession(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessRemoveSession(SessionId id)
{
if(id != null)
{
ConnectionId connectionId = id.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.RemoveSession(id);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessAddConnection(ConnectionInfo info)
{
if(info != null)
{
ConnectionState connState = new ConnectionState(info);
if(connectionStates.ContainsKey(info.ConnectionId))
{
connectionStates[info.ConnectionId] = connState;
}
else
{
connectionStates.Add(info.ConnectionId, connState);
}
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessRemoveConnection(ConnectionId id)
{
if(id != null)
{
connectionStates.Remove(id);
}
return TRACKED_RESPONSE_MARKER;
}
public override Response ProcessMessage(Message send)
{
if(send != null)
{
if(TrackTransactions && send.TransactionId != null)
{
ProducerId producerId = send.ProducerId;
ConnectionId connectionId = producerId.ParentId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[send.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(send);
if (isTrackTransactionProducers)
{
SessionState ss = cs[producerId.ParentId];
ProducerState producerState = ss[producerId];
producerState.TransactionState = transactionState;
}
}
}
}
return TRACKED_RESPONSE_MARKER;
}
else if(TrackMessages)
{
messageCache.Add(send.MessageId, (Message) send.Clone());
}
}
return null;
}
public override Response ProcessMessageAck(MessageAck ack)
{
if(TrackTransactions && ack != null && ack.TransactionId != null)
{
ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[ack.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(ack);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response ProcessBeginTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null && info.TransactionId != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.AddTransactionState(info.TransactionId);
TransactionState state = cs[info.TransactionId];
state.AddCommand(info);
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response ProcessPrepareTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response ProcessCommitTransactionOnePhase(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response ProcessCommitTransactionTwoPhase(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response ProcessRollbackTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(info);
return new Tracked(new RemoveTransactionAction(info, this));
}
}
}
}
return null;
}
public override Response ProcessEndTransaction(TransactionInfo info)
{
if(TrackTransactions && info != null)
{
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
ConnectionState cs = null;
if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
{
transactionState.AddCommand(info);
}
}
}
return TRACKED_RESPONSE_MARKER;
}
return null;
}
public override Response ProcessMessagePull(MessagePull pull)
{
if (pull != null)
{
// leave a single instance in the cache
String id = pull.Destination + "::" + pull.ConsumerId;
messageCache[id] = pull;
}
return null;
}
public bool RestoreConsumers
{
get { return isRestoreConsumers; }
set { isRestoreConsumers = value; }
}
public bool RestoreProducers
{
get { return isRestoreProducers; }
set { isRestoreProducers = value; }
}
public bool RestoreSessions
{
get { return isRestoreSessions; }
set { isRestoreSessions = value; }
}
public bool TrackTransactions
{
get { return isTrackTransactions; }
set { isTrackTransactions = value; }
}
public bool TrackTransactionProducers
{
get { return isTrackTransactionProducers; }
set { isTrackTransactionProducers = value; }
}
public bool RestoreTransaction
{
get { return isRestoreTransaction; }
set { isRestoreTransaction = value; }
}
public bool TrackMessages
{
get { return isTrackMessages; }
set { isTrackMessages = value; }
}
public int MaxCacheSize
{
get { return maxCacheSize; }
set
{
this.maxCacheSize = value;
this.messageCache.MaxCacheSize = maxCacheSize;
}
}
public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
{
ConnectionState connectionState = null;
if(connectionStates.TryGetValue(connectionId, out connectionState))
{
connectionState.ConnectionInterruptProcessingComplete = true;
Dictionary<ConsumerId, ConsumerInfo> consumersToRestorePrefetchOn;
lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
{
consumersToRestorePrefetchOn = new Dictionary<ConsumerId, ConsumerInfo>(connectionState.RecoveringPullConsumers);
connectionState.RecoveringPullConsumers.Clear();
}
foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in consumersToRestorePrefetchOn)
{
ConsumerControl control = new ConsumerControl();
control.ConsumerId = entry.Key;
control.Prefetch = entry.Value.PrefetchSize;
control.Destination = entry.Value.Destination;
try
{
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
" with: " + control.Prefetch);
}
transport.Oneway(control);
}
catch(Exception ex)
{
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
" with: " + control.Prefetch + "Error: " + ex.Message);
}
}
}
}
}
public void TransportInterrupted(ConnectionId id)
{
ConnectionState connection = null;
if(connectionStates.TryGetValue(id, out connection))
{
connection.ConnectionInterruptProcessingComplete = false;
}
}
}
}