IGNITE-22086 Thin client: initialize observableTimestamp in handshake (#3679)

Before this fix we propagated `observableTimestamp` to the client with every response, but not on handshake. As a result, the very first operation from the client has `observableTimestamp = 0`, which can lead to causality issues.

Fix Java, .NET, C++ clients.
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
index 25e6d08..ac49c37 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
@@ -151,7 +151,7 @@
         ItClientHandlerTestUtils.connectAndHandshake(serverModule);
 
         assertTrue(
-                IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 80, 1000),
+                IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 89, 1000),
                 () -> "bytesSent: " + testServer.metrics().bytesSent());
 
         assertTrue(
@@ -161,7 +161,7 @@
         ItClientHandlerTestUtils.connectAndHandshake(serverModule, false, true);
 
         assertTrue(
-                IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 185, 1000),
+                IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 194, 1000),
                 () -> "bytesSent: " + testServer.metrics().bytesSent());
 
         assertTrue(
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 3f8c41b..e547342 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -135,6 +135,7 @@
             final var nodeName = unpacker.unpackString();
             unpacker.skipValue(); // Cluster id.
             unpacker.skipValue(); // Cluster name.
+            unpacker.skipValue(); // Observable timestamp.
 
             unpacker.skipValue(); // Major.
             unpacker.skipValue(); // Minor.
@@ -150,7 +151,7 @@
             unpacker.skipValue(extensionsLen);
 
             assertArrayEquals(MAGIC, magic);
-            assertEquals(72, len);
+            assertEquals(81, len);
             assertEquals(3, major);
             assertEquals(0, minor);
             assertEquals(0, patch);
@@ -275,6 +276,7 @@
             final var nodeName = unpacker.unpackString();
             unpacker.skipValue(); // Cluster id.
             unpacker.skipValue(); // Cluster name.
+            unpacker.skipValue(); // Observable timestamp.
 
             unpacker.skipValue(); // Major.
             unpacker.skipValue(); // Minor.
@@ -289,7 +291,7 @@
             unpacker.skipValue(extensionsLen);
 
             assertArrayEquals(MAGIC, magic);
-            assertEquals(72, len);
+            assertEquals(81, len);
             assertEquals(3, major);
             assertEquals(0, minor);
             assertEquals(0, patch);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 12f4d97..5669f4d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -375,6 +375,8 @@
             packer.packUuid(tag.clusterId());
             packer.packString(tag.clusterName());
 
+            packer.packLong(observableTimestamp(null));
+
             // Pack current version
             packer.packByte(IgniteProductVersion.CURRENT_VERSION.major());
             packer.packByte(IgniteProductVersion.CURRENT_VERSION.minor());
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index c6ac9a3..6735002 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.client;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -72,18 +71,4 @@
      * @return Protocol context.
      */
     ProtocolContext protocolContext();
-
-    /**
-     * Add topology change listener.
-     *
-     * @param listener Listener.
-     */
-    void addPartitionAssignmentChangeListener(Consumer<Long> listener);
-
-    /**
-     * Add observable timestamp listener.
-     *
-     * @param listener Listener.
-     */
-    void addObservableTimestampListener(Consumer<Long> listener);
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
index b517957..f8a6d50 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
 
 /**
@@ -36,5 +37,7 @@
     CompletableFuture<ClientChannel> create(
             ClientChannelConfiguration cfg,
             ClientConnectionMultiplexer multiplexer,
-            ClientMetricSource metrics);
+            ClientMetricSource metrics,
+            Consumer<Long> assignmentChangeListener,
+            Consumer<Long> observableTimestampListener);
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 26336a3..e648d33 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -810,7 +810,14 @@
                             new IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due to applied throttling"));
                 }
 
-                chFut0 = chFactory.create(chCfg, connMgr, metrics).thenApply(ch -> {
+                CompletableFuture<ClientChannel> createFut = chFactory.create(
+                        chCfg,
+                        connMgr,
+                        metrics,
+                        ReliableChannel.this::onPartitionAssignmentChanged,
+                        ReliableChannel.this::onObservableTimestampReceived);
+
+                chFut0 = createFut.thenApply(ch -> {
                     var oldClusterId = clusterId.compareAndExchange(null, ch.protocolContext().clusterId());
 
                     if (oldClusterId != null && !oldClusterId.equals(ch.protocolContext().clusterId())) {
@@ -825,9 +832,6 @@
                                 "Cluster ID mismatch: expected=" + oldClusterId + ", actual=" + ch.protocolContext().clusterId());
                     }
 
-                    ch.addPartitionAssignmentChangeListener(ReliableChannel.this::onPartitionAssignmentChanged);
-                    ch.addObservableTimestampListener(ReliableChannel.this::onObservableTimestampReceived);
-
                     ClusterNode newNode = ch.protocolContext().clusterNode();
 
                     // There could be multiple holders map to the same serverNodeId if user provide the same
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2ef14fb..9d235c0 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -26,13 +26,11 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
@@ -93,10 +91,10 @@
     private final Map<Long, CompletableFuture<PayloadInputChannel>> notificationHandlers = new ConcurrentHashMap<>();
 
     /** Topology change listeners. */
-    private final Collection<Consumer<Long>> assignmentChangeListeners = new CopyOnWriteArrayList<>();
+    private final Consumer<Long> assignmentChangeListener;
 
     /** Observable timestamp listeners. */
-    private final Collection<Consumer<Long>> observableTimestampListeners = new CopyOnWriteArrayList<>();
+    private final Consumer<Long> observableTimestampListener;
 
     /** Closed flag. */
     private final AtomicBoolean closed = new AtomicBoolean();
@@ -128,10 +126,16 @@
      * @param cfg Config.
      * @param metrics Metrics.
      */
-    private TcpClientChannel(ClientChannelConfiguration cfg, ClientMetricSource metrics) {
+    private TcpClientChannel(
+            ClientChannelConfiguration cfg,
+            ClientMetricSource metrics,
+            Consumer<Long> assignmentChangeListener,
+            Consumer<Long> observableTimestampListener) {
         validateConfiguration(cfg);
         this.cfg = cfg;
         this.metrics = metrics;
+        this.assignmentChangeListener = assignmentChangeListener;
+        this.observableTimestampListener = observableTimestampListener;
 
         log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class);
 
@@ -183,9 +187,12 @@
     static CompletableFuture<ClientChannel> createAsync(
             ClientChannelConfiguration cfg,
             ClientConnectionMultiplexer connMgr,
-            ClientMetricSource metrics) {
+            ClientMetricSource metrics,
+            Consumer<Long> assignmentChangeListener,
+            Consumer<Long> observableTimestampListener) {
         //noinspection resource - returned from method.
-        return new TcpClientChannel(cfg, metrics).initAsync(connMgr);
+        return new TcpClientChannel(cfg, metrics, assignmentChangeListener, observableTimestampListener)
+                .initAsync(connMgr);
     }
 
     /** {@inheritDoc} */
@@ -422,10 +429,7 @@
 
     private void handleObservableTimestamp(ClientMessageUnpacker unpacker) {
         long observableTimestamp = unpacker.unpackLong();
-
-        for (Consumer<Long> listener : observableTimestampListeners) {
-            listener.accept(observableTimestamp);
-        }
+        observableTimestampListener.accept(observableTimestamp);
     }
 
     private void handlePartitionAssignmentChange(int flags, ClientMessageUnpacker unpacker) {
@@ -435,9 +439,7 @@
             }
 
             long maxStartTime = unpacker.unpackLong();
-            for (Consumer<Long> listener : assignmentChangeListeners) {
-                listener.accept(maxStartTime);
-            }
+            assignmentChangeListener.accept(maxStartTime);
         }
     }
 
@@ -525,17 +527,6 @@
         return protocolCtx;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void addPartitionAssignmentChangeListener(Consumer<Long> listener) {
-        assignmentChangeListeners.add(listener);
-    }
-
-    @Override
-    public void addObservableTimestampListener(Consumer<Long> listener) {
-        observableTimestampListeners.add(listener);
-    }
-
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
 
@@ -639,6 +630,9 @@
             var clusterId = unpacker.unpackUuid();
             var clusterName = unpacker.unpackString();
 
+            long observableTimestamp = unpacker.unpackLong();
+            observableTimestampListener.accept(observableTimestamp);
+
             unpacker.unpackByte(); // cluster version major
             unpacker.unpackByte(); // cluster version minor
             unpacker.unpackByte(); // cluster version maintenance
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index b7bb60f..935913f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -229,12 +229,12 @@
         client = clientBuilder().build();
 
         assertEquals(15, metrics().bytesSent());
-        assertEquals(76, metrics().bytesReceived());
+        assertEquals(85, metrics().bytesReceived());
 
         client.tables().tables();
 
         assertEquals(21, metrics().bytesSent());
-        assertEquals(97, metrics().bytesReceived());
+        assertEquals(106, metrics().bytesReceived());
     }
 
     @Test
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index f8c2195..bc4a787 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -74,6 +74,12 @@
         ReliableChannel ch = IgniteTestUtils.getFieldValue(client, "ch");
         TransactionOptions roOpts = new TransactionOptions().readOnly(true);
 
+        // +2 because logical time is incremented on every call to nowLong - for replica tracker and for handshake.
+        assertEquals(
+                (currentServerTimestamp.get() << LOGICAL_TIME_BITS_SIZE) + 2,
+                ch.observableTimestamp(),
+                "Handshake should initialize observable timestamp");
+
         assertNull(lastObservableTimestamp());
 
         // RW TX does not propagate timestamp.
diff --git a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index e893102..104616c 100644
--- a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -32,7 +32,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.Test;
@@ -204,16 +203,6 @@
         }
 
         @Override
-        public void addPartitionAssignmentChangeListener(Consumer<Long> listener) {
-            // No-op.
-        }
-
-        @Override
-        public void addObservableTimestampListener(Consumer<Long> listener) {
-            // No-op.
-        }
-
-        @Override
         public void close() {
             // No-op.
         }
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index afc2101..88e4598 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -72,10 +72,7 @@
     }
 
     auto observable_timestamp = reader.read_int64();
-    auto event_handler = m_event_handler.lock();
-    if (event_handler) {
-        event_handler->on_observable_timestamp_changed(observable_timestamp);
-    }
+    on_observable_timestamp_changed(observable_timestamp);
 
     std::optional<ignite_error> err{};
     if (test_flag(flags, protocol::response_flag::ERROR_FLAG)) {
@@ -112,6 +109,13 @@
     }
 }
 
+void node_connection::on_observable_timestamp_changed(int64_t observable_timestamp) const {
+    auto event_handler = m_event_handler.lock();
+    if (event_handler) {
+        event_handler->on_observable_timestamp_changed(observable_timestamp);
+    }
+}
+
 ignite_result<void> node_connection::process_handshake_rsp(bytes_view msg) {
     m_logger->log_debug("Got handshake response");
 
@@ -129,6 +133,8 @@
         return {ignite_error(*response.error)};
     }
 
+    on_observable_timestamp_changed(response.observable_timestamp);
+
     m_protocol_context = response.context;
     m_handshake_complete = true;
 
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.h b/modules/platforms/cpp/ignite/client/detail/node_connection.h
index bca5f69..f33a94e 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.h
@@ -232,6 +232,13 @@
      */
     std::shared_ptr<response_handler> find_handler_unsafe(std::int64_t req_id);
 
+    /**
+     * Notify event handler about observable timestamp change.
+     *
+     * @param observable_timestamp New observable timestamp.
+     */
+    void on_observable_timestamp_changed(int64_t observable_timestamp) const;
+
     /** Handshake complete. */
     bool m_handshake_complete{false};
 
diff --git a/modules/platforms/cpp/ignite/protocol/messages.cpp b/modules/platforms/cpp/ignite/protocol/messages.cpp
index 0e3344f..921d4f5 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.cpp
+++ b/modules/platforms/cpp/ignite/protocol/messages.cpp
@@ -69,6 +69,8 @@
     res.context.set_cluster_id(reader.read_uuid());
     res.context.set_cluster_name(reader.read_string());
 
+    res.observable_timestamp = reader.read_int64();
+
     auto dbms_ver_major = reader.read_uint8();
     auto dbms_ver_minor = reader.read_uint8();
     auto dbms_ver_maintenance = reader.read_uint8();
diff --git a/modules/platforms/cpp/ignite/protocol/messages.h b/modules/platforms/cpp/ignite/protocol/messages.h
index 3a195e2..f2fd8d7 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.h
+++ b/modules/platforms/cpp/ignite/protocol/messages.h
@@ -62,6 +62,9 @@
 
     /** Protocol context. */
     protocol_context context{};
+
+    /** Observable timestamp. */
+    int64_t observable_timestamp;
 };
 
 /**
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 6dfe0f9..13a1ac3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -93,6 +93,8 @@
 
         public Guid ClusterId { get; set; }
 
+        public string ClusterName { get; set; } = "fake-cluster";
+
         public string[] PartitionAssignment { get; set; }
 
         public long PartitionAssignmentTimestamp { get; set; }
@@ -164,7 +166,19 @@
             handshakeWriter.Write(0); // Idle timeout.
             handshakeWriter.Write(Node.Id); // Node id.
             handshakeWriter.Write(Node.Name); // Node name (consistent id).
+
             handshakeWriter.Write(ClusterId);
+            handshakeWriter.Write(ClusterName);
+
+            handshakeWriter.Write(ObservableTimestamp);
+
+            // Cluster version.
+            handshakeWriter.Write(1);
+            handshakeWriter.Write(2);
+            handshakeWriter.Write(3);
+            handshakeWriter.Write(4);
+            handshakeWriter.Write("-abcd");
+
             handshakeWriter.WriteBinaryHeader(0); // Features.
             handshakeWriter.Write(0); // Extensions.
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 68b9cae..c0cb4eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -92,15 +92,15 @@
         using var client = await server.ConnectClientAsync();
 
         AssertMetric(MetricNames.BytesSent, 15);
-        AssertMetric(MetricNames.BytesReceived, 63);
+        AssertMetric(MetricNames.BytesReceived, 88);
 
         await client.Tables.GetTablesAsync();
 
         AssertMetric(MetricNames.BytesSent, 21);
-        AssertMetric(MetricNames.BytesReceived, 72);
+        AssertMetric(MetricNames.BytesReceived, 97);
 
         AssertTaggedMetric(MetricNames.BytesSent, 21, server, client);
-        AssertTaggedMetric(MetricNames.BytesReceived, 72, server, client);
+        AssertTaggedMetric(MetricNames.BytesReceived, 97, server, client);
     }
 
     [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 00acc0f..9dbc313 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -23,6 +23,7 @@
     using System.Threading.Tasks;
     using System.Transactions;
     using Ignite.Transactions;
+    using Internal;
     using NUnit.Framework;
     using Table;
     using TransactionOptions = Ignite.Transactions.TransactionOptions;
@@ -284,11 +285,29 @@
         }
 
         [Test]
+        public async Task TestObservableTimestampIsInitializedFromHandshake()
+        {
+            using var client = await IgniteClient.StartAsync(new() { Endpoints = { "127.0.0.1:" + ServerPort } });
+            var observableTimestamp = client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp;
+
+            Assert.Greater(observableTimestamp, 0);
+        }
+
+        [Test]
         public async Task TestObservableTimestampPropagation([Values(true, false)] bool sql)
         {
-            using var server = new FakeServer();
+            using var server = new FakeServer
+            {
+                ObservableTimestamp = 111
+            };
+
             using var client = await server.ConnectClientAsync();
 
+            Assert.AreEqual(
+                server.ObservableTimestamp,
+                client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp,
+                "Handshake should initialize observable timestamp");
+
             server.ObservableTimestamp = 123;
 
             // Non-transactional operations do not propagate timestamp.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 90d183f..471923d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -213,7 +213,7 @@
                     logger.LogSslConnectionEstablishedDebug(socket.RemoteEndPoint, sslStream.NegotiatedCipherSuite);
                 }
 
-                var context = await HandshakeAsync(stream, endPoint, configuration, cts.Token)
+                var context = await HandshakeAsync(stream, endPoint, configuration, listener, cts.Token)
                     .WaitAsync(configuration.SocketTimeout, cts.Token)
                     .ConfigureAwait(false);
 
@@ -287,11 +287,13 @@
         /// <param name="stream">Network stream.</param>
         /// <param name="endPoint">Endpoint.</param>
         /// <param name="configuration">Configuration.</param>
+        /// <param name="listener">Client socket event listener.</param>
         /// <param name="cancellationToken">Cancellation token.</param>
         private static async Task<ConnectionContext> HandshakeAsync(
             Stream stream,
             SocketEndpoint endPoint,
             IgniteClientConfiguration configuration,
+            IClientSocketEventListener listener,
             CancellationToken cancellationToken)
         {
             await stream.WriteAsync(ProtoCommon.MagicBytes, cancellationToken).ConfigureAwait(false);
@@ -305,7 +307,7 @@
             using var response = await ReadResponseAsync(stream, new byte[4], endPoint.MetricsContext, CancellationToken.None)
                 .ConfigureAwait(false);
 
-            return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream));
+            return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream), listener);
         }
 
         private static async ValueTask CheckMagicBytesAsync(
@@ -336,7 +338,11 @@
             }
         }
 
-        private static ConnectionContext ReadHandshakeResponse(MsgPackReader reader, SocketEndpoint endPoint, ISslInfo? sslInfo)
+        private static ConnectionContext ReadHandshakeResponse(
+            MsgPackReader reader,
+            SocketEndpoint endPoint,
+            ISslInfo? sslInfo,
+            IClientSocketEventListener listener)
         {
             var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16());
 
@@ -353,7 +359,19 @@
             var idleTimeoutMs = reader.ReadInt64();
             var clusterNodeId = reader.ReadString();
             var clusterNodeName = reader.ReadString();
+
             var clusterId = reader.ReadGuid();
+            var clusterName = reader.ReadString();
+
+            var observableTimestamp = reader.ReadInt64();
+            listener.OnObservableTimestampChanged(observableTimestamp);
+
+            // Cluster version.
+            reader.Skip(); // Major.
+            reader.Skip(); // Minor.
+            reader.Skip(); // Maintenance.
+            reader.Skip(); // Patch.
+            reader.Skip(); // Pre-release.
 
             reader.Skip(); // Features.
             reader.Skip(); // Extensions.
@@ -363,6 +381,7 @@
                 TimeSpan.FromMilliseconds(idleTimeoutMs),
                 new ClusterNode(clusterNodeId, clusterNodeName, endPoint.EndPoint, endPoint.MetricsContext),
                 clusterId,
+                clusterName,
                 sslInfo);
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
index eb88008..f3583bd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
@@ -28,11 +28,13 @@
     /// <param name="IdleTimeout">Server idle timeout.</param>
     /// <param name="ClusterNode">Cluster node.</param>
     /// <param name="ClusterId">Cluster id.</param>
+    /// <param name="ClusterName">Cluster name.</param>
     /// <param name="SslInfo">SSL info.</param>
     internal record ConnectionContext(
         ClientProtocolVersion Version,
         TimeSpan IdleTimeout,
         ClusterNode ClusterNode,
         Guid ClusterId,
+        string ClusterName,
         ISslInfo? SslInfo);
 }