AMQNET-838 ActiveMQ NMS client does not support nested parameters for failover transport
Refer "Configuring Nested URI Options" section in https://activemq.apache.org/failover-transport-reference.html
This is supported for jms client however not for nms client.
reference for jms client :
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java#L70
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1431
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1019
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1194
As part of this PR , adding support to nested parameters in NMS client for failover transport.
Co-authored-by: Kumar, Vivek <vivek.kumar5@ge.com>
diff --git a/src/Transport/Failover/FailoverTransport.cs b/src/Transport/Failover/FailoverTransport.cs
index f14a927..ce78cc1 100644
--- a/src/Transport/Failover/FailoverTransport.cs
+++ b/src/Transport/Failover/FailoverTransport.cs
@@ -94,9 +94,10 @@
private List<Uri> priorityList = new List<Uri>();
private bool priorityBackupAvailable = false;
private String sslProtocol = null;
+ private string nestedExtraQueryOptions;
- // Not Sure how to work these back in with all the changes.
- //private int asyncTimeout = 45000;
+ // Not Sure how to work these back in with all the changes.
+ //private int asyncTimeout = 45000;
//private bool asyncConnect = false;
public FailoverTransport()
@@ -1177,7 +1178,7 @@
// URI from the pool until next time around.
if (transport == null)
{
- uri = iter.Current;
+ uri = AddExtraQueryOptions(iter.Current);
transport = TransportFactory.CompositeConnect(uri);
}
@@ -1310,14 +1311,14 @@
}
}
- foreach(Uri uri in connectList)
+ foreach(Uri u in connectList)
{
if (disposed)
{
break;
}
-
- if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+ Uri uri = AddExtraQueryOptions(u);
+ if (ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
{
try
{
@@ -1712,5 +1713,35 @@
return result;
}
+
+ public void SetNestedExtraQueryOptions(String nestedExtraQueryOptions)
+ {
+ this.nestedExtraQueryOptions = nestedExtraQueryOptions;
+ }
+
+ private Uri AddExtraQueryOptions(Uri uri)
+ {
+ try
+ {
+ if (!string.IsNullOrEmpty(nestedExtraQueryOptions))
+ {
+ if (uri.Query == null)
+ {
+ uri = URISupport.CreateUriWithQuery(uri, nestedExtraQueryOptions);
+ }
+ else
+ {
+ uri = URISupport.CreateUriWithQuery(uri, uri.Query + "&" + nestedExtraQueryOptions);
+ }
+ Tracer.Info($"URI with nested parameter is {uri.ToString()}");
+ }
+ }
+ catch (UriFormatException e)
+ {
+ Tracer.Error(e.Message);
+ throw;
+ }
+ return uri;
+ }
}
}
diff --git a/src/Transport/Failover/FailoverTransportFactory.cs b/src/Transport/Failover/FailoverTransportFactory.cs
index 504d07b..b5740f0 100644
--- a/src/Transport/Failover/FailoverTransportFactory.cs
+++ b/src/Transport/Failover/FailoverTransportFactory.cs
@@ -55,7 +55,15 @@
StringDictionary options = compositData.Parameters;
FailoverTransport transport = CreateTransport(options);
transport.Add(false, compositData.Components);
- return transport;
+ try
+ {
+ transport.SetNestedExtraQueryOptions(URISupport.CreateQueryString(URISupport.GetProperties(options, "nested.")));
+ }
+ catch (Exception e)
+ {
+ Tracer.Error($"Error in setting nested parameters {e.Message}");
+ }
+ return transport;
}
protected FailoverTransport CreateTransport(StringDictionary parameters)
diff --git a/test/Transport/failover/FailoverTransportTest.cs b/test/Transport/failover/FailoverTransportTest.cs
index ec3cc67..4bdfb51 100644
--- a/test/Transport/failover/FailoverTransportTest.cs
+++ b/test/Transport/failover/FailoverTransportTest.cs
@@ -162,8 +162,36 @@
Assert.IsTrue(failover.IsConnected);
}
}
+ [Test]
+ public void FailoverTransportWithNestedParametersTest()
+ {
+ Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false&transport.backup=true&nested.transport.failOnSendMessage=true&nested.transport.numSentMessagesBeforeFail=20");
+ FailoverTransportFactory factory = new FailoverTransportFactory();
- [Test]
+ using (ITransport transport = factory.CreateTransport(uri))
+ {
+ Assert.IsNotNull(transport);
+ transport.CommandAsync = OnCommand;
+ transport.Exception = OnException;
+ transport.Resumed = OnResumed;
+ transport.Interrupted = OnInterrupted;
+
+ FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+ Assert.IsTrue(failover.Backup);
+
+ transport.Start();
+ Thread.Sleep(1000);
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
+ Assert.IsNotNull(mock);
+ Assert.IsTrue(mock.FailOnSendMessage);
+ Assert.AreEqual(20,mock.NumSentMessagesBeforeFail);
+ }
+ }
+ [Test]
public void FailoverTransportCreateFailOnCreateTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +