blob: b6b9d7d0f873d8cf41a23771b8d19a479d901157 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
namespace Apache.Ignite.Core.Tests.Client
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Configuration;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Log;
using Apache.Ignite.Core.Tests.Client.Cache;
using NUnit.Framework;
/// <summary>
/// Tests client connection: port ranges, version checks, etc.
/// </summary>
public class ClientConnectionTest
/** Temp dir for WAL. */
private readonly string _tempDir = PathUtils.GetTempDirectoryName();
/// <summary>
/// Sets up the test.
/// </summary>
public void SetUp()
/// <summary>
/// Test tear down.
/// </summary>
public void TearDown()
if (Directory.Exists(_tempDir))
Directory.Delete(_tempDir, true);
/// <summary>
/// Tests that missing server yields connection refused error.
/// </summary>
public void TestNoServerConnectionRefused()
var ex = Assert.Throws<AggregateException>(() => StartClient());
var socketEx = ex.InnerExceptions.OfType<SocketException>().First();
Assert.AreEqual(SocketError.ConnectionRefused, socketEx.SocketErrorCode);
/// <summary>
/// Tests that empty username or password are not allowed.
/// </summary>
public void TestAuthenticationEmptyCredentials()
using (Ignition.Start(SecureServerConfig()))
var cliCfg = GetSecureClientConfig();
cliCfg.Password = null;
var ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.IsTrue(ex.Message.StartsWith("IgniteClientConfiguration.Password cannot be null"));
cliCfg.Password = "";
ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.IsTrue(ex.Message.StartsWith("IgniteClientConfiguration.Password cannot be empty"));
cliCfg.Password = "ignite";
cliCfg.UserName = null;
ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.IsTrue(ex.Message.StartsWith("IgniteClientConfiguration.UserName cannot be null"));
cliCfg.UserName = "";
ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.IsTrue(ex.Message.StartsWith("IgniteClientConfiguration.Username cannot be empty"));
/// <summary>
/// Test invalid username or password.
/// </summary>
public void TestAuthenticationInvalidCredentials()
using (Ignition.Start(SecureServerConfig()))
var cliCfg = GetSecureClientConfig();
cliCfg.UserName = "invalid";
var ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.True(ex.StatusCode == ClientStatusCode.AuthenticationFailed);
cliCfg.UserName = "ignite";
cliCfg.Password = "invalid";
ex = Assert.Throws<IgniteClientException>(() => { Ignition.StartClient(cliCfg); });
Assert.True(ex.StatusCode == ClientStatusCode.AuthenticationFailed);
/// <summary>
/// Test authentication.
/// </summary>
public void TestAuthentication()
CreateNewUserAndAuthenticate("my_User", "my_Password");
/// <summary>
/// Test authentication.
/// </summary>
public void TestAuthenticationLongToken()
string user = new string('G', 59);
string pass = new string('q', 16 * 1024);
CreateNewUserAndAuthenticate(user, pass);
/// <summary>
/// Tests that multiple clients can connect to one server.
/// </summary>
public void TestMultipleClients()
using (var ignite = Ignition.Start(TestUtils.GetTestConfiguration()))
Assert.AreEqual(0, GetThinClientConnections(ignite).Length);
var client1 = StartClient();
var thinClientConnections = GetThinClientConnections(ignite);
Assert.AreEqual(1, thinClientConnections.Length);
"rmtAddr=" + client1.GetConnections().Single().LocalEndPoint,
var client2 = StartClient();
Assert.AreEqual(2, GetThinClientConnections(ignite).Length);
var client3 = StartClient();
Assert.AreEqual(3, GetThinClientConnections(ignite).Length);
// ReSharper disable AccessToDisposedClosure
TestUtils.WaitForTrueCondition(() => 2 == GetThinClientConnections(ignite).Length);
TestUtils.WaitForTrueCondition(() => 1 == GetThinClientConnections(ignite).Length);
TestUtils.WaitForTrueCondition(() => 0 == GetThinClientConnections(ignite).Length);
// ReSharper restore AccessToDisposedClosure
/// <summary>
/// Tests custom connector and client configuration.
/// </summary>
public void TestCustomConfig()
var servCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
ClientConnectorConfiguration = new ClientConnectorConfiguration
Host = "localhost",
Port = 2000,
PortRange = 1,
SocketSendBufferSize = 100,
SocketReceiveBufferSize = 50
var clientCfg = new IgniteClientConfiguration
Endpoints = new[] {"localhost:2000"},
Logger = new ConsoleLogger()
using (Ignition.Start(servCfg))
using (var client = Ignition.StartClient(clientCfg))
Assert.AreNotEqual(clientCfg, client.GetConfiguration());
Assert.AreNotEqual(client.GetConfiguration(), client.GetConfiguration());
Assert.AreEqual(clientCfg.ToXml(), client.GetConfiguration().ToXml());
var conn = client.GetConnections().Single();
Assert.AreEqual(servCfg.ClientConnectorConfiguration.Port, ((IPEndPoint) conn.RemoteEndPoint).Port);
/// <summary>
/// Tests client config with EndPoints property.
/// </summary>
public void TestEndPoints()
using (var ignite = Ignition.Start(TestUtils.GetTestConfiguration()))
ignite.CreateCache<int, int>("foo");
const int port = IgniteClientConfiguration.DefaultPort;
// DnsEndPoint.
var cfg = new IgniteClientConfiguration
Endpoints = new[] { "localhost" }
using (var client = Ignition.StartClient(cfg))
Assert.AreEqual("foo", client.GetCacheNames().Single());
// IPEndPoint.
cfg = new IgniteClientConfiguration
Endpoints = new[] { "" + port }
using (var client = Ignition.StartClient(cfg))
Assert.AreEqual("foo", client.GetCacheNames().Single());
// Port range.
cfg = new IgniteClientConfiguration("");
using (var client = Ignition.StartClient(cfg))
Assert.AreEqual("foo", client.GetCacheNames().Single());
/// <summary>
/// Tests that empty port range causes an exception.
/// </summary>
public void TestEmptyPortRangeThrows()
var cfg = new IgniteClientConfiguration("");
var ex = Assert.Throws<IgniteClientException>(() => Ignition.StartClient(cfg));
"Invalid format of IgniteClientConfiguration.Endpoint, port range is empty:",
/// <summary>
/// Tests that default configuration throws.
/// </summary>
public void TestDefaultConfigThrows()
Assert.Throws<IgniteClientException>(() => Ignition.StartClient(new IgniteClientConfiguration()));
/// <summary>
/// Tests that connector can be disabled.
/// </summary>
public void TestDisabledConnector()
var servCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
ClientConnectorConfigurationEnabled = false
var clientCfg = new IgniteClientConfiguration
Endpoints = new[] {"localhost"}
using (Ignition.Start(servCfg))
var ex = Assert.Throws<AggregateException>(() => Ignition.StartClient(clientCfg));
Assert.AreEqual("Failed to establish Ignite thin client connection, " +
"examine inner exceptions for details.", ex.Message.Substring(0, 88));
// Disable only thin client.
servCfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
ClientConnectorConfiguration = new ClientConnectorConfiguration
ThinClientEnabled = false
using (Ignition.Start(servCfg))
var ex = Assert.Throws<IgniteClientException>(() => Ignition.StartClient(clientCfg));
Assert.AreEqual("Client handshake failed: 'Thin client connection is not allowed, " +
"see ClientConnectorConfiguration.thinClientEnabled.'.",
ex.Message.Substring(0, 118));
/// <summary>
/// Tests that we get a proper exception when server disconnects (node shutdown, network issues, etc).
/// </summary>
public void TestServerConnectionAborted()
var evt = new ManualResetEventSlim();
var ignite = Ignition.Start(TestUtils.GetTestConfiguration());
var putGetTask = TaskRunner.Run(() =>
using (var client = StartClient())
var cache = client.GetOrCreateCache<int, int>("foo");
for (var i = 0; i < 100000; i++)
cache[i] = i;
Assert.AreEqual(i, cache.GetAsync(i).Result);
var ex = Assert.Throws<AggregateException>(() => putGetTask.Wait());
var socketEx = ex.GetInnermostException() as SocketException;
if (socketEx != null)
new[] {SocketError.ConnectionAborted, SocketError.ConnectionReset});
Assert.Fail("Unexpected exception: " + ex);
/// <summary>
/// Tests the operation timeout.
/// </summary>
public void TestOperationTimeout()
var data = Enumerable.Range(1, 500000).ToDictionary(x => x, x => x.ToString());
var cfg = GetClientConfiguration();
cfg.SocketTimeout = TimeSpan.FromMilliseconds(500);
var client = Ignition.StartClient(cfg);
var cache = client.CreateCache<int, string>("s");
Assert.AreEqual(cfg.SocketTimeout, client.GetConfiguration().SocketTimeout);
// Async.
var task = cache.PutAllAsync(data);
var ex = Assert.Catch(() => task.Wait());
Assert.AreEqual(SocketError.TimedOut, GetSocketException(ex).SocketErrorCode);
// Sync (reconnect for clean state).
client = Ignition.StartClient(cfg);
cache = client.CreateCache<int, string>("s");
ex = Assert.Catch(() => cache.PutAll(data));
Assert.AreEqual(SocketError.TimedOut, GetSocketException(ex).SocketErrorCode);
/// <summary>
/// Tests the client dispose while operations are in progress.
/// </summary>
public void TestClientDisposeWhileOperationsAreInProgress()
const int count = 10000;
var ops = new Task[count];
var clientCfg = new IgniteClientConfiguration(GetClientConfiguration())
SocketTimeout = TimeSpan.FromSeconds(30)
using (var client = Ignition.StartClient(clientCfg))
var cache = client.GetOrCreateCache<int, int>("foo");
Parallel.For(0, count, new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount},
i =>
ops[i] = cache.PutAllAsync(Enumerable.Range(i*100, 100).ToDictionary(x => x, x => x));
var failed = ops.Where(x => x.Status == TaskStatus.Faulted);
foreach (var task in failed)
var ex = task.Exception;
var baseEx = ex.GetBaseException();
Assert.IsNotNull((object) (baseEx as SocketException) ?? baseEx as ObjectDisposedException,
/// <summary>
/// Tests the <see cref="ClientConnectorConfiguration.IdleTimeout"/> property.
/// </summary>
public void TestIdleTimeout()
var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
ClientConnectorConfiguration = new ClientConnectorConfiguration
IdleTimeout = TimeSpan.FromMilliseconds(100)
var ignite = Ignition.Start(cfg);
Assert.AreEqual(100, ignite.GetConfiguration().ClientConnectorConfiguration.IdleTimeout.TotalMilliseconds);
using (var client = StartClient())
var cache = client.GetOrCreateCache<int, int>("foo");
cache[1] = 1;
Assert.AreEqual(1, cache[1]);
Assert.AreEqual(1, cache[1]);
// Idle check frequency is 2 seconds.
var ex = Assert.Catch(() => cache.Get(1));
Assert.AreEqual(SocketError.ConnectionAborted, GetSocketException(ex).SocketErrorCode);
/// <summary>
/// Tests the protocol mismatch behavior: attempt to connect to an HTTP endpoint.
/// </summary>
public void TestProtocolMismatch()
using (Ignition.Start(TestUtils.GetTestConfiguration()))
// Connect to Ignite REST endpoint.
var cfg = new IgniteClientConfiguration("");
var ex = GetSocketException(Assert.Catch(() => Ignition.StartClient(cfg)));
Assert.AreEqual(SocketError.ConnectionAborted, ex.SocketErrorCode);
/// <summary>
/// Tests reconnect logic with single server.
/// </summary>
public void TestReconnect()
// Connect client and check.
var client = Ignition.StartClient(new IgniteClientConfiguration(""));
Assert.AreEqual(0, client.GetCacheNames().Count);
var ep = client.RemoteEndPoint as IPEndPoint;
Assert.AreEqual(IgniteClientConfiguration.DefaultPort, ep.Port);
Assert.AreEqual("", ep.Address.ToString());
ep = client.LocalEndPoint as IPEndPoint;
Assert.AreNotEqual(IgniteClientConfiguration.DefaultPort, ep.Port);
Assert.AreEqual("", ep.Address.ToString());
// Stop server.
// First request fails, error is detected.
var ex = Assert.Catch(() => client.GetCacheNames());
// Second request causes reconnect attempt which fails (server is stopped).
Assert.Catch(() => client.GetCacheNames());
// Start server, next operation succeeds.
Assert.AreEqual(0, client.GetCacheNames().Count);
/// <summary>
/// Tests disabled reconnect behavior.
/// </summary>
public void TestReconnectDisabled()
// Connect client and check.
using (var client = Ignition.StartClient(new IgniteClientConfiguration("")
ReconnectDisabled = true
Assert.AreEqual(0, client.GetCacheNames().Count);
// Stop server.
// Request fails, error is detected.
var ex = Assert.Catch(() => client.GetCacheNames());
// Restart server, client does not reconnect.
ex = Assert.Catch(() => client.GetCacheNames());
/// <summary>
/// Tests reconnect logic with multiple servers.
/// </summary>
public void TestFailover()
// Start 3 nodes.
Ignition.Start(TestUtils.GetTestConfiguration(name: "0"));
Ignition.Start(TestUtils.GetTestConfiguration(name: "1"));
Ignition.Start(TestUtils.GetTestConfiguration(name: "2"));
// Connect client.
var port = IgniteClientConfiguration.DefaultPort;
var cfg = new IgniteClientConfiguration
Endpoints = new[]
string.Format("{0}..{1}", port + 1, port + 2)
using (var client = Ignition.StartClient(cfg))
Assert.AreEqual(0, client.GetCacheNames().Count);
// Stop target node.
var nodeId = ((IPEndPoint) client.RemoteEndPoint).Port - port;
Ignition.Stop(nodeId.ToString(), true);
// Check failure.
Assert.IsNotNull(GetSocketException(Assert.Catch(() => client.GetCacheNames())));
// Check reconnect.
Assert.AreEqual(0, client.GetCacheNames().Count);
// Stop target node.
nodeId = ((IPEndPoint) client.RemoteEndPoint).Port - port;
Ignition.Stop(nodeId.ToString(), true);
// Check failure.
Assert.IsNotNull(GetSocketException(Assert.Catch(() => client.GetCacheNames())));
// Check reconnect.
Assert.AreEqual(0, client.GetCacheNames().Count);
// Stop all nodes.
Assert.IsNotNull(GetSocketException(Assert.Catch(() => client.GetCacheNames())));
Assert.IsNotNull(GetSocketException(Assert.Catch(() => client.GetCacheNames())));
/// <summary>
/// Tests that client stops it's receiver thread upon disposal.
/// </summary>
public void TestClientDisposalStopsReceiverThread([Values(true, false)] bool async)
var logger = new ListLogger {EnabledLevels = new[] {LogLevel.Trace}};
var cfg = new IgniteClientConfiguration(GetClientConfiguration())
Logger = logger
using (var client = Ignition.StartClient(cfg))
var cache = client.GetOrCreateCache<int, int>("c");
if (async)
cache.PutAsync(1, 1);
cache.Put(1, 1);
var threadId = logger.Entries
.Select(e => Regex.Match(e.Message, "Receiver thread #([0-9]+) started."))
.Where(m => m.Success)
.Select(m => int.Parse(m.Groups[1].Value))
TestUtils.WaitForTrueCondition(() => logger.Entries.Any(
e => e.Message == string.Format("Receiver thread #{0} stopped.", threadId)));
/// <summary>
/// Starts the client.
/// </summary>
private static IIgniteClient StartClient()
return Ignition.StartClient(GetClientConfiguration());
/// <summary>
/// Gets the client configuration.
/// </summary>
private static IgniteClientConfiguration GetClientConfiguration()
return new IgniteClientConfiguration(IPAddress.Loopback.ToString());
/// <summary>
/// Finds SocketException in the hierarchy.
/// </summary>
private static SocketException GetSocketException(Exception ex)
var origEx = ex;
while (ex != null)
var socketEx = ex as SocketException;
if (socketEx != null)
return socketEx;
ex = ex.InnerException;
throw new Exception("SocketException not found.", origEx);
/// <summary>
/// Create server configuration with enabled authentication.
/// </summary>
/// <returns>Server configuration.</returns>
private IgniteConfiguration SecureServerConfig()
return new IgniteConfiguration(TestUtils.GetTestConfiguration())
AuthenticationEnabled = true,
DataStorageConfiguration = new DataStorageConfiguration
StoragePath = Path.Combine(_tempDir, "Store"),
WalPath = Path.Combine(_tempDir, "WalStore"),
WalArchivePath = Path.Combine(_tempDir, "WalArchive"),
DefaultDataRegionConfiguration = new DataRegionConfiguration
Name = "default",
PersistenceEnabled = true
/// <summary>
/// Create client configuration with enabled authentication.
/// </summary>
/// <returns>Client configuration.</returns>
private static IgniteClientConfiguration GetSecureClientConfig()
return new IgniteClientConfiguration("localhost")
UserName = "ignite",
Password = "ignite",
SocketTimeout = TimeSpan.FromSeconds(10)
/// <summary>
/// Gets thin client connections for the given server node.
/// </summary>
/// <param name="ignite">Ignite server instance.</param>
/// <returns>Active thin client connections.</returns>
private static string[] GetThinClientConnections(IIgnite ignite)
return ignite.GetCompute().ExecuteJavaTask<string[]>(
"org.apache.ignite.platform.PlatformThinClientConnectionsTask", ignite.Name);
/// <summary>
/// Start new node, create new user with given credentials and try to authenticate.
/// </summary>
/// <param name="user">Username</param>
/// <param name="pass">Password</param>
private void CreateNewUserAndAuthenticate(string user, string pass)
using (var srv = Ignition.Start(SecureServerConfig()))
using (var cli = Ignition.StartClient(GetSecureClientConfig()))
CacheClientConfiguration ccfg = new CacheClientConfiguration
Name = "TestCache",
QueryEntities = new[]
new QueryEntity
KeyType = typeof(string),
ValueType = typeof(string),
ICacheClient<string, string> cache = cli.GetOrCreateCache<string, string>(ccfg);
cache.Put("key1", "val1");
cache.Query(new SqlFieldsQuery("CREATE USER \"" + user + "\" WITH PASSWORD '" + pass + "'")).GetAll();
var cliCfg = GetSecureClientConfig();
cliCfg.UserName = user;
cliCfg.Password = pass;
using (var cli = Ignition.StartClient(cliCfg))
ICacheClient<string, string> cache = cli.GetCache<string, string>("TestCache");
string val = cache.Get("key1");
Assert.True(val == "val1");