Support service url being an ip and not only a host name. Also reworked the service discovery logic.
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index de3ac0b..68a2642 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -2,8 +2,8 @@
using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Concurrent;
-using System.ComponentModel;
using System.Linq;
+using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
@@ -46,34 +46,33 @@
public async Task<Connection> FindConnectionForTopic(string topic, CancellationToken cancellationToken)
{
- var connection = await CreateConnection(_serviceUrl, cancellationToken);
+ var lookup = new CommandLookupTopic
+ {
+ Topic = topic,
+ Authoritative = false
+ };
- var authoritative = false;
+ Uri serviceUrl = _serviceUrl;
while (true)
{
- var lookup = new CommandLookupTopic
- {
- Topic = topic,
- Authoritative = authoritative
- };
+ var connection = await CreateConnection(serviceUrl, cancellationToken);
var response = await connection.Send(lookup);
response.Expect(BaseCommand.Type.LookupResponse);
- switch (response.LookupTopicResponse.Response)
- {
- case CommandLookupTopicResponse.LookupType.Connect:
- return connection;
- case CommandLookupTopicResponse.LookupType.Redirect:
- authoritative = response.LookupTopicResponse.Authoritative;
- connection = await CreateConnection(new Uri(response.LookupTopicResponse.BrokerServiceUrl), cancellationToken);
- continue;
- case CommandLookupTopicResponse.LookupType.Failed:
- response.LookupTopicResponse.Throw();
- break;
- default:
- throw new InvalidEnumArgumentException("LookupType", (int)response.LookupTopicResponse.Response, typeof(CommandLookupTopicResponse.LookupType));
- }
+ if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
+ response.LookupTopicResponse.Throw();
+
+ lookup.Authoritative = response.LookupTopicResponse.Authoritative;
+ serviceUrl = new Uri(response.LookupTopicResponse.BrokerServiceUrl);
+
+ if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
+ continue;
+
+ if (_serviceUrl.IsLoopback) // LookupType is Connect, ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker
+ return connection;
+ else
+ return await CreateConnection(serviceUrl, cancellationToken);
}
}
@@ -85,7 +84,17 @@
return connection;
var tcpClient = new TcpClient();
- await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
+
+ switch (Uri.CheckHostName(serviceUrl.Host))
+ {
+ case UriHostNameType.IPv4:
+ case UriHostNameType.IPv6:
+ await tcpClient.ConnectAsync(IPAddress.Parse(serviceUrl.Host), serviceUrl.Port);
+ break;
+ default:
+ await tcpClient.ConnectAsync(serviceUrl.Host, serviceUrl.Port);
+ break;
+ }
connection = new Connection(tcpClient.GetStream());
Register(serviceUrl, connection);