IGNITE-22086 Thin client: initialize observableTimestamp in handshake (#3679)
Before this fix we propagated `observableTimestamp` to the client with every response, but not on handshake. As a result, the very first operation from the client has `observableTimestamp = 0`, which can lead to causality issues.
Fix Java, .NET, C++ clients.
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
index 25e6d08..ac49c37 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
@@ -151,7 +151,7 @@
ItClientHandlerTestUtils.connectAndHandshake(serverModule);
assertTrue(
- IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 80, 1000),
+ IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 89, 1000),
() -> "bytesSent: " + testServer.metrics().bytesSent());
assertTrue(
@@ -161,7 +161,7 @@
ItClientHandlerTestUtils.connectAndHandshake(serverModule, false, true);
assertTrue(
- IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 185, 1000),
+ IgniteTestUtils.waitForCondition(() -> testServer.metrics().bytesSent() == 194, 1000),
() -> "bytesSent: " + testServer.metrics().bytesSent());
assertTrue(
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 3f8c41b..e547342 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -135,6 +135,7 @@
final var nodeName = unpacker.unpackString();
unpacker.skipValue(); // Cluster id.
unpacker.skipValue(); // Cluster name.
+ unpacker.skipValue(); // Observable timestamp.
unpacker.skipValue(); // Major.
unpacker.skipValue(); // Minor.
@@ -150,7 +151,7 @@
unpacker.skipValue(extensionsLen);
assertArrayEquals(MAGIC, magic);
- assertEquals(72, len);
+ assertEquals(81, len);
assertEquals(3, major);
assertEquals(0, minor);
assertEquals(0, patch);
@@ -275,6 +276,7 @@
final var nodeName = unpacker.unpackString();
unpacker.skipValue(); // Cluster id.
unpacker.skipValue(); // Cluster name.
+ unpacker.skipValue(); // Observable timestamp.
unpacker.skipValue(); // Major.
unpacker.skipValue(); // Minor.
@@ -289,7 +291,7 @@
unpacker.skipValue(extensionsLen);
assertArrayEquals(MAGIC, magic);
- assertEquals(72, len);
+ assertEquals(81, len);
assertEquals(3, major);
assertEquals(0, minor);
assertEquals(0, patch);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 12f4d97..5669f4d 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -375,6 +375,8 @@
packer.packUuid(tag.clusterId());
packer.packString(tag.clusterName());
+ packer.packLong(observableTimestamp(null));
+
// Pack current version
packer.packByte(IgniteProductVersion.CURRENT_VERSION.major());
packer.packByte(IgniteProductVersion.CURRENT_VERSION.minor());
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index c6ac9a3..6735002 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
/**
@@ -72,18 +71,4 @@
* @return Protocol context.
*/
ProtocolContext protocolContext();
-
- /**
- * Add topology change listener.
- *
- * @param listener Listener.
- */
- void addPartitionAssignmentChangeListener(Consumer<Long> listener);
-
- /**
- * Add observable timestamp listener.
- *
- * @param listener Listener.
- */
- void addObservableTimestampListener(Consumer<Long> listener);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
index b517957..f8a6d50 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
/**
@@ -36,5 +37,7 @@
CompletableFuture<ClientChannel> create(
ClientChannelConfiguration cfg,
ClientConnectionMultiplexer multiplexer,
- ClientMetricSource metrics);
+ ClientMetricSource metrics,
+ Consumer<Long> assignmentChangeListener,
+ Consumer<Long> observableTimestampListener);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 26336a3..e648d33 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -810,7 +810,14 @@
new IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due to applied throttling"));
}
- chFut0 = chFactory.create(chCfg, connMgr, metrics).thenApply(ch -> {
+ CompletableFuture<ClientChannel> createFut = chFactory.create(
+ chCfg,
+ connMgr,
+ metrics,
+ ReliableChannel.this::onPartitionAssignmentChanged,
+ ReliableChannel.this::onObservableTimestampReceived);
+
+ chFut0 = createFut.thenApply(ch -> {
var oldClusterId = clusterId.compareAndExchange(null, ch.protocolContext().clusterId());
if (oldClusterId != null && !oldClusterId.equals(ch.protocolContext().clusterId())) {
@@ -825,9 +832,6 @@
"Cluster ID mismatch: expected=" + oldClusterId + ", actual=" + ch.protocolContext().clusterId());
}
- ch.addPartitionAssignmentChangeListener(ReliableChannel.this::onPartitionAssignmentChanged);
- ch.addObservableTimestampListener(ReliableChannel.this::onObservableTimestampReceived);
-
ClusterNode newNode = ch.protocolContext().clusterNode();
// There could be multiple holders map to the same serverNodeId if user provide the same
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2ef14fb..9d235c0 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -26,13 +26,11 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@@ -93,10 +91,10 @@
private final Map<Long, CompletableFuture<PayloadInputChannel>> notificationHandlers = new ConcurrentHashMap<>();
/** Topology change listeners. */
- private final Collection<Consumer<Long>> assignmentChangeListeners = new CopyOnWriteArrayList<>();
+ private final Consumer<Long> assignmentChangeListener;
/** Observable timestamp listeners. */
- private final Collection<Consumer<Long>> observableTimestampListeners = new CopyOnWriteArrayList<>();
+ private final Consumer<Long> observableTimestampListener;
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -128,10 +126,16 @@
* @param cfg Config.
* @param metrics Metrics.
*/
- private TcpClientChannel(ClientChannelConfiguration cfg, ClientMetricSource metrics) {
+ private TcpClientChannel(
+ ClientChannelConfiguration cfg,
+ ClientMetricSource metrics,
+ Consumer<Long> assignmentChangeListener,
+ Consumer<Long> observableTimestampListener) {
validateConfiguration(cfg);
this.cfg = cfg;
this.metrics = metrics;
+ this.assignmentChangeListener = assignmentChangeListener;
+ this.observableTimestampListener = observableTimestampListener;
log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class);
@@ -183,9 +187,12 @@
static CompletableFuture<ClientChannel> createAsync(
ClientChannelConfiguration cfg,
ClientConnectionMultiplexer connMgr,
- ClientMetricSource metrics) {
+ ClientMetricSource metrics,
+ Consumer<Long> assignmentChangeListener,
+ Consumer<Long> observableTimestampListener) {
//noinspection resource - returned from method.
- return new TcpClientChannel(cfg, metrics).initAsync(connMgr);
+ return new TcpClientChannel(cfg, metrics, assignmentChangeListener, observableTimestampListener)
+ .initAsync(connMgr);
}
/** {@inheritDoc} */
@@ -422,10 +429,7 @@
private void handleObservableTimestamp(ClientMessageUnpacker unpacker) {
long observableTimestamp = unpacker.unpackLong();
-
- for (Consumer<Long> listener : observableTimestampListeners) {
- listener.accept(observableTimestamp);
- }
+ observableTimestampListener.accept(observableTimestamp);
}
private void handlePartitionAssignmentChange(int flags, ClientMessageUnpacker unpacker) {
@@ -435,9 +439,7 @@
}
long maxStartTime = unpacker.unpackLong();
- for (Consumer<Long> listener : assignmentChangeListeners) {
- listener.accept(maxStartTime);
- }
+ assignmentChangeListener.accept(maxStartTime);
}
}
@@ -525,17 +527,6 @@
return protocolCtx;
}
- /** {@inheritDoc} */
- @Override
- public void addPartitionAssignmentChangeListener(Consumer<Long> listener) {
- assignmentChangeListeners.add(listener);
- }
-
- @Override
- public void addObservableTimestampListener(Consumer<Long> listener) {
- observableTimestampListeners.add(listener);
- }
-
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
@@ -639,6 +630,9 @@
var clusterId = unpacker.unpackUuid();
var clusterName = unpacker.unpackString();
+ long observableTimestamp = unpacker.unpackLong();
+ observableTimestampListener.accept(observableTimestamp);
+
unpacker.unpackByte(); // cluster version major
unpacker.unpackByte(); // cluster version minor
unpacker.unpackByte(); // cluster version maintenance
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index b7bb60f..935913f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -229,12 +229,12 @@
client = clientBuilder().build();
assertEquals(15, metrics().bytesSent());
- assertEquals(76, metrics().bytesReceived());
+ assertEquals(85, metrics().bytesReceived());
client.tables().tables();
assertEquals(21, metrics().bytesSent());
- assertEquals(97, metrics().bytesReceived());
+ assertEquals(106, metrics().bytesReceived());
}
@Test
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index f8c2195..bc4a787 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -74,6 +74,12 @@
ReliableChannel ch = IgniteTestUtils.getFieldValue(client, "ch");
TransactionOptions roOpts = new TransactionOptions().readOnly(true);
+ // +2 because logical time is incremented on every call to nowLong - for replica tracker and for handshake.
+ assertEquals(
+ (currentServerTimestamp.get() << LOGICAL_TIME_BITS_SIZE) + 2,
+ ch.observableTimestamp(),
+ "Handshake should initialize observable timestamp");
+
assertNull(lastObservableTimestamp());
// RW TX does not propagate timestamp.
diff --git a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index e893102..104616c 100644
--- a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -32,7 +32,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
@@ -204,16 +203,6 @@
}
@Override
- public void addPartitionAssignmentChangeListener(Consumer<Long> listener) {
- // No-op.
- }
-
- @Override
- public void addObservableTimestampListener(Consumer<Long> listener) {
- // No-op.
- }
-
- @Override
public void close() {
// No-op.
}
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index afc2101..88e4598 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -72,10 +72,7 @@
}
auto observable_timestamp = reader.read_int64();
- auto event_handler = m_event_handler.lock();
- if (event_handler) {
- event_handler->on_observable_timestamp_changed(observable_timestamp);
- }
+ on_observable_timestamp_changed(observable_timestamp);
std::optional<ignite_error> err{};
if (test_flag(flags, protocol::response_flag::ERROR_FLAG)) {
@@ -112,6 +109,13 @@
}
}
+void node_connection::on_observable_timestamp_changed(int64_t observable_timestamp) const {
+ auto event_handler = m_event_handler.lock();
+ if (event_handler) {
+ event_handler->on_observable_timestamp_changed(observable_timestamp);
+ }
+}
+
ignite_result<void> node_connection::process_handshake_rsp(bytes_view msg) {
m_logger->log_debug("Got handshake response");
@@ -129,6 +133,8 @@
return {ignite_error(*response.error)};
}
+ on_observable_timestamp_changed(response.observable_timestamp);
+
m_protocol_context = response.context;
m_handshake_complete = true;
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.h b/modules/platforms/cpp/ignite/client/detail/node_connection.h
index bca5f69..f33a94e 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.h
@@ -232,6 +232,13 @@
*/
std::shared_ptr<response_handler> find_handler_unsafe(std::int64_t req_id);
+ /**
+ * Notify event handler about observable timestamp change.
+ *
+ * @param observable_timestamp New observable timestamp.
+ */
+ void on_observable_timestamp_changed(int64_t observable_timestamp) const;
+
/** Handshake complete. */
bool m_handshake_complete{false};
diff --git a/modules/platforms/cpp/ignite/protocol/messages.cpp b/modules/platforms/cpp/ignite/protocol/messages.cpp
index 0e3344f..921d4f5 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.cpp
+++ b/modules/platforms/cpp/ignite/protocol/messages.cpp
@@ -69,6 +69,8 @@
res.context.set_cluster_id(reader.read_uuid());
res.context.set_cluster_name(reader.read_string());
+ res.observable_timestamp = reader.read_int64();
+
auto dbms_ver_major = reader.read_uint8();
auto dbms_ver_minor = reader.read_uint8();
auto dbms_ver_maintenance = reader.read_uint8();
diff --git a/modules/platforms/cpp/ignite/protocol/messages.h b/modules/platforms/cpp/ignite/protocol/messages.h
index 3a195e2..f2fd8d7 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.h
+++ b/modules/platforms/cpp/ignite/protocol/messages.h
@@ -62,6 +62,9 @@
/** Protocol context. */
protocol_context context{};
+
+ /** Observable timestamp. */
+ int64_t observable_timestamp;
};
/**
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 6dfe0f9..13a1ac3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -93,6 +93,8 @@
public Guid ClusterId { get; set; }
+ public string ClusterName { get; set; } = "fake-cluster";
+
public string[] PartitionAssignment { get; set; }
public long PartitionAssignmentTimestamp { get; set; }
@@ -164,7 +166,19 @@
handshakeWriter.Write(0); // Idle timeout.
handshakeWriter.Write(Node.Id); // Node id.
handshakeWriter.Write(Node.Name); // Node name (consistent id).
+
handshakeWriter.Write(ClusterId);
+ handshakeWriter.Write(ClusterName);
+
+ handshakeWriter.Write(ObservableTimestamp);
+
+ // Cluster version.
+ handshakeWriter.Write(1);
+ handshakeWriter.Write(2);
+ handshakeWriter.Write(3);
+ handshakeWriter.Write(4);
+ handshakeWriter.Write("-abcd");
+
handshakeWriter.WriteBinaryHeader(0); // Features.
handshakeWriter.Write(0); // Extensions.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 68b9cae..c0cb4eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -92,15 +92,15 @@
using var client = await server.ConnectClientAsync();
AssertMetric(MetricNames.BytesSent, 15);
- AssertMetric(MetricNames.BytesReceived, 63);
+ AssertMetric(MetricNames.BytesReceived, 88);
await client.Tables.GetTablesAsync();
AssertMetric(MetricNames.BytesSent, 21);
- AssertMetric(MetricNames.BytesReceived, 72);
+ AssertMetric(MetricNames.BytesReceived, 97);
AssertTaggedMetric(MetricNames.BytesSent, 21, server, client);
- AssertTaggedMetric(MetricNames.BytesReceived, 72, server, client);
+ AssertTaggedMetric(MetricNames.BytesReceived, 97, server, client);
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 00acc0f..9dbc313 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -23,6 +23,7 @@
using System.Threading.Tasks;
using System.Transactions;
using Ignite.Transactions;
+ using Internal;
using NUnit.Framework;
using Table;
using TransactionOptions = Ignite.Transactions.TransactionOptions;
@@ -284,11 +285,29 @@
}
[Test]
+ public async Task TestObservableTimestampIsInitializedFromHandshake()
+ {
+ using var client = await IgniteClient.StartAsync(new() { Endpoints = { "127.0.0.1:" + ServerPort } });
+ var observableTimestamp = client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp;
+
+ Assert.Greater(observableTimestamp, 0);
+ }
+
+ [Test]
public async Task TestObservableTimestampPropagation([Values(true, false)] bool sql)
{
- using var server = new FakeServer();
+ using var server = new FakeServer
+ {
+ ObservableTimestamp = 111
+ };
+
using var client = await server.ConnectClientAsync();
+ Assert.AreEqual(
+ server.ObservableTimestamp,
+ client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp,
+ "Handshake should initialize observable timestamp");
+
server.ObservableTimestamp = 123;
// Non-transactional operations do not propagate timestamp.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 90d183f..471923d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -213,7 +213,7 @@
logger.LogSslConnectionEstablishedDebug(socket.RemoteEndPoint, sslStream.NegotiatedCipherSuite);
}
- var context = await HandshakeAsync(stream, endPoint, configuration, cts.Token)
+ var context = await HandshakeAsync(stream, endPoint, configuration, listener, cts.Token)
.WaitAsync(configuration.SocketTimeout, cts.Token)
.ConfigureAwait(false);
@@ -287,11 +287,13 @@
/// <param name="stream">Network stream.</param>
/// <param name="endPoint">Endpoint.</param>
/// <param name="configuration">Configuration.</param>
+ /// <param name="listener">Client socket event listener.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private static async Task<ConnectionContext> HandshakeAsync(
Stream stream,
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
+ IClientSocketEventListener listener,
CancellationToken cancellationToken)
{
await stream.WriteAsync(ProtoCommon.MagicBytes, cancellationToken).ConfigureAwait(false);
@@ -305,7 +307,7 @@
using var response = await ReadResponseAsync(stream, new byte[4], endPoint.MetricsContext, CancellationToken.None)
.ConfigureAwait(false);
- return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream));
+ return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream), listener);
}
private static async ValueTask CheckMagicBytesAsync(
@@ -336,7 +338,11 @@
}
}
- private static ConnectionContext ReadHandshakeResponse(MsgPackReader reader, SocketEndpoint endPoint, ISslInfo? sslInfo)
+ private static ConnectionContext ReadHandshakeResponse(
+ MsgPackReader reader,
+ SocketEndpoint endPoint,
+ ISslInfo? sslInfo,
+ IClientSocketEventListener listener)
{
var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16());
@@ -353,7 +359,19 @@
var idleTimeoutMs = reader.ReadInt64();
var clusterNodeId = reader.ReadString();
var clusterNodeName = reader.ReadString();
+
var clusterId = reader.ReadGuid();
+ var clusterName = reader.ReadString();
+
+ var observableTimestamp = reader.ReadInt64();
+ listener.OnObservableTimestampChanged(observableTimestamp);
+
+ // Cluster version.
+ reader.Skip(); // Major.
+ reader.Skip(); // Minor.
+ reader.Skip(); // Maintenance.
+ reader.Skip(); // Patch.
+ reader.Skip(); // Pre-release.
reader.Skip(); // Features.
reader.Skip(); // Extensions.
@@ -363,6 +381,7 @@
TimeSpan.FromMilliseconds(idleTimeoutMs),
new ClusterNode(clusterNodeId, clusterNodeName, endPoint.EndPoint, endPoint.MetricsContext),
clusterId,
+ clusterName,
sslInfo);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
index eb88008..f3583bd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
@@ -28,11 +28,13 @@
/// <param name="IdleTimeout">Server idle timeout.</param>
/// <param name="ClusterNode">Cluster node.</param>
/// <param name="ClusterId">Cluster id.</param>
+ /// <param name="ClusterName">Cluster name.</param>
/// <param name="SslInfo">SSL info.</param>
internal record ConnectionContext(
ClientProtocolVersion Version,
TimeSpan IdleTimeout,
ClusterNode ClusterNode,
Guid ClusterId,
+ string ClusterName,
ISslInfo? SslInfo);
}