Send a 'ping' if there has been no activity on the connection for 30 seconds
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 839b1bf..cc0db11 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -43,6 +43,11 @@
IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
/// <summary>
+ /// The time to wait before sending a 'ping' if there has been no activity on the connection. The default is 30 seconds.
+ /// </summary>
+ IPulsarClientBuilder KeepAliveInterval(TimeSpan interval);
+
+ /// <summary>
/// Set the listener name. This is optional.
/// </summary>
IPulsarClientBuilder ListenerName(string listenerName);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b7ebc52..7534981 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,6 +18,7 @@
using Exceptions;
using Extensions;
using PulsarApi;
+ using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
@@ -30,11 +31,11 @@
private readonly IPulsarStream _stream;
private int _isDisposed;
- public Connection(IPulsarStream stream)
+ public Connection(IPulsarStream stream, TimeSpan keepAliveInterval)
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
- _pingPongHandler = new PingPongHandler(this);
+ _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
_stream = stream;
}
@@ -278,18 +279,13 @@
var commandSize = frame.ReadUInt32(0, true);
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
- switch (command.CommandType)
- {
- case BaseCommand.Type.Message:
- _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
- break;
- case BaseCommand.Type.Ping:
- _pingPongHandler.GotPing();
- break;
- default:
- _channelManager.Incoming(command);
- break;
- }
+ if (_pingPongHandler.Incoming(command.CommandType))
+ continue;
+
+ if (command.CommandType == BaseCommand.Type.Message)
+ _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+ else
+ _channelManager.Incoming(command);
}
}
catch
@@ -303,6 +299,7 @@
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
+ await _pingPongHandler.DisposeAsync().ConfigureAwait(false);
await _lock.DisposeAsync().ConfigureAwait(false);
_channelManager.Dispose();
await _stream.DisposeAsync().ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index dd4f366..b98a832 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -35,6 +35,7 @@
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _closeInactiveConnections;
private readonly string? _listenerName;
+ private readonly TimeSpan _keepAliveInterval;
public ConnectionPool(
CommandConnect commandConnect,
@@ -42,7 +43,8 @@
Connector connector,
EncryptionPolicy encryptionPolicy,
TimeSpan closeInactiveConnectionsInterval,
- string? listenerName)
+ string? listenerName,
+ TimeSpan keepAliveInterval)
{
_lock = new AsyncLock();
_commandConnect = commandConnect;
@@ -53,6 +55,7 @@
_connections = new ConcurrentDictionary<PulsarUrl, Connection>();
_cancellationTokenSource = new CancellationTokenSource();
_closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
+ _keepAliveInterval = keepAliveInterval;
}
public async ValueTask DisposeAsync()
@@ -154,7 +157,7 @@
private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
{
var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
- var connection = new Connection(new PulsarStream(stream));
+ var connection = new Connection(new PulsarStream(stream), _keepAliveInterval);
DotPulsarEventSource.Log.ConnectionCreated();
_connections[url] = connection;
_ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(url));
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
index da406aa..9c4f1ca 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -16,21 +16,73 @@
{
using Abstractions;
using PulsarApi;
+ using System;
+ using System.Diagnostics;
+ using System.Threading;
using System.Threading.Tasks;
- public sealed class PingPongHandler
+ public sealed class PingPongHandler : IAsyncDisposable
{
private readonly IConnection _connection;
+ private readonly TimeSpan _keepAliveInterval;
+ private readonly Timer _timer;
+ private readonly CommandPing _ping;
private readonly CommandPong _pong;
+ private long _lastCommand;
- public PingPongHandler(IConnection connection)
+ public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval)
{
_connection = connection;
+ _keepAliveInterval = keepAliveInterval;
+ _timer = new Timer(Watch);
+ _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+ _ping = new CommandPing();
_pong = new CommandPong();
+ _lastCommand = Stopwatch.GetTimestamp();
}
- public void GotPing()
- => Task.Factory.StartNew(() => SendPong());
+ public bool Incoming(BaseCommand.Type commandType)
+ {
+ Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
+
+ if (commandType == BaseCommand.Type.Ping)
+ {
+ Task.Factory.StartNew(() => SendPong());
+ return true;
+ }
+
+ return commandType == BaseCommand.Type.Pong;
+ }
+
+ private void Watch(object? state)
+ {
+ try
+ {
+ var lastCommand = Interlocked.Read(ref _lastCommand);
+ var now = Stopwatch.GetTimestamp();
+ var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency);
+ if (elapsed >= _keepAliveInterval)
+ {
+ Task.Factory.StartNew(() => SendPing());
+ _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+ }
+ else
+ _timer.Change(_keepAliveInterval.Subtract(elapsed), TimeSpan.Zero);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+
+ private async Task SendPing()
+ {
+ try
+ {
+ await _connection.Send(_ping, default).ConfigureAwait(false);
+ }
+ catch { }
+ }
private async Task SendPong()
{
@@ -40,5 +92,18 @@
}
catch { }
}
+
+#if NETSTANDARD2_0
+ public ValueTask DisposeAsync()
+ {
+ _timer.Dispose();
+ return new ValueTask();
+ }
+#else
+ public async ValueTask DisposeAsync()
+ {
+ await _timer.DisposeAsync().ConfigureAwait(false);
+ }
+#endif
}
}
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 77b4afb..bc5066f 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -27,6 +27,7 @@
private readonly CommandConnect _commandConnect;
private readonly List<IHandleException> _exceptionHandlers;
private EncryptionPolicy? _encryptionPolicy;
+ private TimeSpan _keepAliveInterval;
private string? _listenerName;
private TimeSpan _retryInterval;
private Uri _serviceUrl;
@@ -45,6 +46,7 @@
};
_exceptionHandlers = new List<IHandleException>();
+ _keepAliveInterval = TimeSpan.FromSeconds(30);
_retryInterval = TimeSpan.FromSeconds(3);
_serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
_clientCertificates = new X509Certificate2Collection();
@@ -79,6 +81,12 @@
return this;
}
+ public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval)
+ {
+ _keepAliveInterval = interval;
+ return this;
+ }
+
public IPulsarClientBuilder ListenerName(string listenerName)
{
_listenerName = listenerName;
@@ -146,7 +154,7 @@
var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
- var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName);
+ var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval);
var processManager = new ProcessManager(connectionPool);
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);