Reimplement fix for zombie consumers to update the failover state tracker.
Fixes https://issues.apache.org/jira/browse/AMQNET-394
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 83a9122..5284f86 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -907,8 +907,7 @@
}
}
- Tracer.ErrorFormat("Connection[{0}]: No such consumer active: {1}. Removing...", this.ConnectionId, dispatch.ConsumerId);
- transport.Oneway(new RemoveInfo() { ObjectId = dispatch.ConsumerId });
+ Tracer.ErrorFormat("Connection[{0}]: No such consumer active: {1}", this.ConnectionId, dispatch.ConsumerId);
}
protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
diff --git a/src/main/csharp/Transport/Failover/FailoverTransport.cs b/src/main/csharp/Transport/Failover/FailoverTransport.cs
index 0516f4d..ba50922 100644
--- a/src/main/csharp/Transport/Failover/FailoverTransport.cs
+++ b/src/main/csharp/Transport/Failover/FailoverTransport.cs
@@ -563,7 +563,8 @@
return;
}
else if(command.IsRemoveInfo || command.IsMessageAck)
- {
+ {
+ stateTracker.Track(command);
// Simulate response to RemoveInfo command or a MessageAck
// since it would be stale at this point.
if(command.ResponseRequired)
@@ -596,23 +597,23 @@
// Wait for transport to be connected.
ITransport transport = ConnectedTransport;
DateTime start = DateTime.Now;
- bool timedout = false;
+ bool timedout = false;
while(transport == null && !disposed && connectionFailure == null)
{
Tracer.Info("Waiting for transport to reconnect.");
int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
- if(this.timeout > 0 && elapsed > timeout)
+ if(this.timeout > 0 && elapsed > this.timeout)
{
timedout = true;
Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
break;
- }
-
+ }
+
// Release so that the reconnect task can run
try
{
- // Wait for something
+ // Wait for something. The mutex will be pulsed if we connect.
Monitor.Wait(reconnectMutex, 100);
}
catch(ThreadInterruptedException e)