Send a 'ping' if there has been no activity on the connection for 30 seconds
diff --git a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
index 839b1bf..cc0db11 100644
--- a/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClientBuilder.cs
@@ -43,6 +43,11 @@
         IPulsarClientBuilder ExceptionHandler(IHandleException exceptionHandler);
 
         /// <summary>
+        /// The time to wait before sending a 'ping' if there has been no activity on the connection. The default is 30 seconds.
+        /// </summary>
+        IPulsarClientBuilder KeepAliveInterval(TimeSpan interval);
+
+        /// <summary>
         /// Set the listener name. This is optional.
         /// </summary>
         IPulsarClientBuilder ListenerName(string listenerName);
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b7ebc52..7534981 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,6 +18,7 @@
     using Exceptions;
     using Extensions;
     using PulsarApi;
+    using System;
     using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
@@ -30,11 +31,11 @@
         private readonly IPulsarStream _stream;
         private int _isDisposed;
 
-        public Connection(IPulsarStream stream)
+        public Connection(IPulsarStream stream, TimeSpan keepAliveInterval)
         {
             _lock = new AsyncLock();
             _channelManager = new ChannelManager();
-            _pingPongHandler = new PingPongHandler(this);
+            _pingPongHandler = new PingPongHandler(this, keepAliveInterval);
             _stream = stream;
         }
 
@@ -278,18 +279,13 @@
                     var commandSize = frame.ReadUInt32(0, true);
                     var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
 
-                    switch (command.CommandType)
-                    {
-                        case BaseCommand.Type.Message:
-                            _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
-                            break;
-                        case BaseCommand.Type.Ping:
-                            _pingPongHandler.GotPing();
-                            break;
-                        default:
-                            _channelManager.Incoming(command);
-                            break;
-                    }
+                    if (_pingPongHandler.Incoming(command.CommandType))
+                        continue;
+
+                    if (command.CommandType == BaseCommand.Type.Message)
+                        _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+                    else
+                        _channelManager.Incoming(command);
                 }
             }
             catch
@@ -303,6 +299,7 @@
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
+            await _pingPongHandler.DisposeAsync().ConfigureAwait(false);
             await _lock.DisposeAsync().ConfigureAwait(false);
             _channelManager.Dispose();
             await _stream.DisposeAsync().ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index dd4f366..b98a832 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -35,6 +35,7 @@
         private readonly CancellationTokenSource _cancellationTokenSource;
         private readonly Task _closeInactiveConnections;
         private readonly string? _listenerName;
+        private readonly TimeSpan _keepAliveInterval;
 
         public ConnectionPool(
             CommandConnect commandConnect,
@@ -42,7 +43,8 @@
             Connector connector,
             EncryptionPolicy encryptionPolicy,
             TimeSpan closeInactiveConnectionsInterval,
-            string? listenerName)
+            string? listenerName,
+            TimeSpan keepAliveInterval)
         {
             _lock = new AsyncLock();
             _commandConnect = commandConnect;
@@ -53,6 +55,7 @@
             _connections = new ConcurrentDictionary<PulsarUrl, Connection>();
             _cancellationTokenSource = new CancellationTokenSource();
             _closeInactiveConnections = CloseInactiveConnections(closeInactiveConnectionsInterval, _cancellationTokenSource.Token);
+            _keepAliveInterval = keepAliveInterval;
         }
 
         public async ValueTask DisposeAsync()
@@ -154,7 +157,7 @@
         private async Task<Connection> EstablishNewConnection(PulsarUrl url, CancellationToken cancellationToken)
         {
             var stream = await _connector.Connect(url.Physical).ConfigureAwait(false);
-            var connection = new Connection(new PulsarStream(stream));
+            var connection = new Connection(new PulsarStream(stream), _keepAliveInterval);
             DotPulsarEventSource.Log.ConnectionCreated();
             _connections[url] = connection;
             _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(url));
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
index da406aa..9c4f1ca 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -16,21 +16,73 @@
 {
     using Abstractions;
     using PulsarApi;
+    using System;
+    using System.Diagnostics;
+    using System.Threading;
     using System.Threading.Tasks;
 
-    public sealed class PingPongHandler
+    public sealed class PingPongHandler : IAsyncDisposable
     {
         private readonly IConnection _connection;
+        private readonly TimeSpan _keepAliveInterval;
+        private readonly Timer _timer;
+        private readonly CommandPing _ping;
         private readonly CommandPong _pong;
+        private long _lastCommand;
 
-        public PingPongHandler(IConnection connection)
+        public PingPongHandler(IConnection connection, TimeSpan keepAliveInterval)
         {
             _connection = connection;
+            _keepAliveInterval = keepAliveInterval;
+            _timer = new Timer(Watch);
+            _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+            _ping = new CommandPing();
             _pong = new CommandPong();
+            _lastCommand = Stopwatch.GetTimestamp();
         }
 
-        public void GotPing()
-            => Task.Factory.StartNew(() => SendPong());
+        public bool Incoming(BaseCommand.Type commandType)
+        {
+            Interlocked.Exchange(ref _lastCommand, Stopwatch.GetTimestamp());
+
+            if (commandType == BaseCommand.Type.Ping)
+            {
+                Task.Factory.StartNew(() => SendPong());
+                return true;
+            }
+
+            return commandType == BaseCommand.Type.Pong;
+        }
+
+        private void Watch(object? state)
+        {
+            try
+            {
+                var lastCommand = Interlocked.Read(ref _lastCommand);
+                var now = Stopwatch.GetTimestamp();
+                var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency);
+                if (elapsed >= _keepAliveInterval)
+                {
+                    Task.Factory.StartNew(() => SendPing());
+                    _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+                }
+                else
+                    _timer.Change(_keepAliveInterval.Subtract(elapsed), TimeSpan.Zero);
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
+
+        private async Task SendPing()
+        {
+            try
+            {
+                await _connection.Send(_ping, default).ConfigureAwait(false);
+            }
+            catch { }
+        }
 
         private async Task SendPong()
         {
@@ -40,5 +92,18 @@
             }
             catch { }
         }
+
+#if NETSTANDARD2_0
+        public ValueTask DisposeAsync()
+        {
+            _timer.Dispose();
+            return new ValueTask();
+        }
+#else
+        public async ValueTask DisposeAsync()
+        {
+            await _timer.DisposeAsync().ConfigureAwait(false);
+        }
+#endif
     }
 }
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 77b4afb..bc5066f 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -27,6 +27,7 @@
         private readonly CommandConnect _commandConnect;
         private readonly List<IHandleException> _exceptionHandlers;
         private EncryptionPolicy? _encryptionPolicy;
+        private TimeSpan _keepAliveInterval;
         private string? _listenerName;
         private TimeSpan _retryInterval;
         private Uri _serviceUrl;
@@ -45,6 +46,7 @@
             };
 
             _exceptionHandlers = new List<IHandleException>();
+            _keepAliveInterval = TimeSpan.FromSeconds(30);
             _retryInterval = TimeSpan.FromSeconds(3);
             _serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}");
             _clientCertificates = new X509Certificate2Collection();
@@ -79,6 +81,12 @@
             return this;
         }
 
+        public IPulsarClientBuilder KeepAliveInterval(TimeSpan interval)
+        {
+            _keepAliveInterval = interval;
+            return this;
+        }
+
         public IPulsarClientBuilder ListenerName(string listenerName)
         {
             _listenerName = listenerName;
@@ -146,7 +154,7 @@
 
 
             var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName);
-            var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName);
+            var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval, _listenerName, _keepAliveInterval);
             var processManager = new ProcessManager(connectionPool);
             var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
             var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);