Revert support for proxies and make ready for 0.8.4
diff --git a/.gitignore b/.gitignore
index 4590ca5..35c74e6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -287,6 +287,3 @@
Network Trash Folder
Temporary Items
.apdisk
-
-### Others ###
-FodyWeavers.xsd
\ No newline at end of file
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index bb74643..9b9211b 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
- <Version>0.8.3</Version>
+ <Version>0.8.4</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>DanskeCommodities;dblank</Authors>
@@ -11,7 +11,7 @@
<Title>DotPulsar</Title>
<PackageTags>Apache;Pulsar</PackageTags>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
- <PackageReleaseNotes>Beta release - Support connecting via a proxy and a minor bug fix</PackageReleaseNotes>
+ <PackageReleaseNotes>Beta release - Revert support for proxies. Will be added again later</PackageReleaseNotes>
<Description>.NET/C# client library for Apache Pulsar</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index c4159de..8e5bc8c 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -69,14 +69,12 @@
Authoritative = false
};
- var logicalUrl = _serviceUrl;
- var physicalUrl = _serviceUrl;
+ var serviceUrl = _serviceUrl;
while (true)
{
- var connection = await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
+ var connection = await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
-
response.Expect(BaseCommand.Type.LookupResponse);
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
@@ -84,12 +82,15 @@
lookup.Authoritative = response.LookupTopicResponse.Authoritative;
- logicalUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
+ serviceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
continue;
- return await GetConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
+ if (_serviceUrl.IsLoopback) // LookupType is 'Connect', ServiceUrl is local and response is authoritative. Assume the Pulsar server is a standalone docker.
+ return connection;
+ else
+ return await GetConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
}
}
@@ -116,46 +117,32 @@
}
}
- // The logical Url differs from the physical Url when you are
- // connecting through a Pulsar proxy. We create 1 physical connection to
- // the Proxy for each logical broker connection we require according to
- // the topic lookup.
- private async ValueTask<Connection> GetConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken)
+ private async ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
{
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- if (_connections.TryGetValue(logicalUrl, out Connection connection))
+ if (_connections.TryGetValue(serviceUrl, out Connection connection))
return connection;
- return await EstablishNewConnection(logicalUrl, physicalUrl, cancellationToken).ConfigureAwait(false);
+ return await EstablishNewConnection(serviceUrl, cancellationToken).ConfigureAwait(false);
}
}
- private async Task<Connection> EstablishNewConnection(Uri logicalUrl, Uri physicalUrl, CancellationToken cancellationToken)
+ private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken)
{
- var stream = await _connector.Connect(physicalUrl).ConfigureAwait(false);
+ var stream = await _connector.Connect(serviceUrl).ConfigureAwait(false);
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
- _connections[logicalUrl] = connection;
- _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(logicalUrl));
-
- if (logicalUrl != physicalUrl)
- {
- // DirectProxyHandler expects the Url with no scheme provided
- _commandConnect.ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}";
- }
-
+ _connections[serviceUrl] = connection;
+ _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl));
var response = await connection.Send(_commandConnect, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
-
- _commandConnect.ResetProxyToBrokerUrl(); // reset so we can re-use this object
-
return connection;
}
- private async ValueTask DisposeConnection(Uri logicalUrl)
+ private async ValueTask DisposeConnection(Uri serviceUrl)
{
- if (_connections.TryRemove(logicalUrl, out var connection))
+ if (_connections.TryRemove(serviceUrl, out Connection connection))
{
await connection.DisposeAsync().ConfigureAwait(false);
DotPulsarEventSource.Log.ConnectionDisposed();