blob: 635d07f355e067bd5d1be30dc5c64b793d79a0ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Impl.Client
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Net;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Client.Datastream;
using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Client.Services;
using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Cache.Platform;
using Apache.Ignite.Core.Impl.Client.Cache;
using Apache.Ignite.Core.Impl.Client.Cluster;
using Apache.Ignite.Core.Impl.Client.Compute;
using Apache.Ignite.Core.Impl.Client.Datastream;
using Apache.Ignite.Core.Impl.Client.DataStructures;
using Apache.Ignite.Core.Impl.Client.Services;
using Apache.Ignite.Core.Impl.Client.Transactions;
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Handle;
using Apache.Ignite.Core.Impl.Plugin;
/// <summary>
/// Thin client implementation.
/// </summary>
internal class IgniteClient : IIgniteInternal, IIgniteClient
{
/** Socket. */
private readonly ClientFailoverSocket _socket;
/** Marshaller. */
private readonly Marshaller _marsh;
/** Binary processor. */
private readonly IBinaryProcessor _binProc;
/** Binary. */
private readonly IBinary _binary;
/** Configuration. */
private readonly IgniteClientConfiguration _configuration;
/** Transactions. */
private readonly TransactionsClient _transactions;
/** Node info cache. */
private readonly ConcurrentDictionary<Guid, IClientClusterNode> _nodes =
new ConcurrentDictionary<Guid, IClientClusterNode>();
/** Cluster. */
private readonly ClientCluster _cluster;
/** Compute. */
private readonly ComputeClient _compute;
/** Services. */
private readonly IServicesClient _services;
/// <summary>
/// Initializes a new instance of the <see cref="IgniteClient"/> class.
/// </summary>
/// <param name="clientConfiguration">The client configuration.</param>
public IgniteClient(IgniteClientConfiguration clientConfiguration)
{
Debug.Assert(clientConfiguration != null);
_configuration = new IgniteClientConfiguration(clientConfiguration);
_marsh = new Marshaller(_configuration.BinaryConfiguration)
{
Ignite = this
};
_transactions = new TransactionsClient(this, clientConfiguration.TransactionConfiguration);
_socket = new ClientFailoverSocket(_configuration, _marsh, _transactions);
_binProc = _configuration.BinaryProcessor ?? new BinaryProcessorClient(_socket);
_binary = new Impl.Binary.Binary(_marsh);
_cluster = new ClientCluster(this);
_compute = new ComputeClient(this, ComputeClientFlags.None, TimeSpan.Zero, null);
_services = new ServicesClient(this);
}
/// <summary>
/// Gets the socket.
/// </summary>
internal ClientFailoverSocket Socket
{
get { return _socket; }
}
/** <inheritDoc /> */
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
Justification = "There is no finalizer.")]
public void Dispose()
{
_socket.Dispose();
_transactions.Dispose();
}
/** <inheritDoc /> */
public ICacheClient<TK, TV> GetCache<TK, TV>(string name)
{
IgniteArgumentCheck.NotNull(name, "name");
return new CacheClient<TK, TV>(this, name);
}
/** <inheritDoc /> */
public ICacheClient<TK, TV> GetOrCreateCache<TK, TV>(string name)
{
IgniteArgumentCheck.NotNull(name, "name");
DoOutOp(ClientOp.CacheGetOrCreateWithName, ctx => ctx.Writer.WriteString(name));
return GetCache<TK, TV>(name);
}
/** <inheritDoc /> */
public ICacheClient<TK, TV> GetOrCreateCache<TK, TV>(CacheClientConfiguration configuration)
{
IgniteArgumentCheck.NotNull(configuration, "configuration");
DoOutOp(ClientOp.CacheGetOrCreateWithConfiguration,
ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.Features));
return GetCache<TK, TV>(configuration.Name);
}
/** <inheritDoc /> */
public ICacheClient<TK, TV> CreateCache<TK, TV>(string name)
{
IgniteArgumentCheck.NotNull(name, "name");
DoOutOp(ClientOp.CacheCreateWithName, ctx => ctx.Writer.WriteString(name));
return GetCache<TK, TV>(name);
}
/** <inheritDoc /> */
public ICacheClient<TK, TV> CreateCache<TK, TV>(CacheClientConfiguration configuration)
{
IgniteArgumentCheck.NotNull(configuration, "configuration");
DoOutOp(ClientOp.CacheCreateWithConfiguration,
ctx => ClientCacheConfigurationSerializer.Write(ctx.Stream, configuration, ctx.Features));
return GetCache<TK, TV>(configuration.Name);
}
/** <inheritDoc /> */
public ICollection<string> GetCacheNames()
{
return DoOutInOp(ClientOp.CacheGetNames, null, ctx => ctx.Reader.ReadStringCollection());
}
/** <inheritDoc /> */
public IClientCluster GetCluster()
{
return _cluster;
}
/** <inheritDoc /> */
public void DestroyCache(string name)
{
IgniteArgumentCheck.NotNull(name, "name");
DoOutOp(ClientOp.CacheDestroy, ctx => ctx.Stream.WriteInt(BinaryUtils.GetCacheId(name)));
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public IIgnite GetIgnite()
{
throw GetClientNotSupportedException();
}
/** <inheritDoc /> */
public IBinary GetBinary()
{
return _binary;
}
/** <inheritDoc /> */
ITransactionsClient IIgniteClient.GetTransactions()
{
return _transactions;
}
/** Internal transactions representation. */
internal TransactionsClient Transactions
{
get { return _transactions; }
}
/** <inheritDoc /> */
public CacheAffinityImpl GetAffinity(string cacheName)
{
throw GetClientNotSupportedException();
}
/** <inheritDoc /> */
public CacheAffinityManager GetAffinityManager(string cacheName)
{
throw GetClientNotSupportedException();
}
/** <inheritDoc /> */
public CacheConfiguration GetCacheConfiguration(int cacheId)
{
throw GetClientNotSupportedException();
}
/** <inheritDoc /> */
public object GetJavaThreadLocal()
{
throw GetClientNotSupportedException();
}
/** <inheritDoc /> */
public IgniteClientConfiguration GetConfiguration()
{
// Return a copy to allow modifications by the user.
return new IgniteClientConfiguration(_configuration);
}
/** <inheritDoc /> */
public EndPoint RemoteEndPoint
{
get { return _socket.RemoteEndPoint; }
}
/** <inheritDoc /> */
public EndPoint LocalEndPoint
{
get { return _socket.LocalEndPoint; }
}
/** <inheritDoc /> */
public IEnumerable<IClientConnection> GetConnections()
{
return _socket.GetConnections();
}
/** <inheritDoc /> */
public IComputeClient GetCompute()
{
return _compute;
}
/** <inheritDoc /> */
public IServicesClient GetServices()
{
return _services;
}
/** <inheritDoc /> */
public IDataStreamerClient<TK, TV> GetDataStreamer<TK, TV>(string cacheName)
{
return GetDataStreamer<TK, TV>(cacheName, null);
}
/** <inheritDoc /> */
public IDataStreamerClient<TK, TV> GetDataStreamer<TK, TV>(string cacheName, DataStreamerClientOptions options)
{
return GetDataStreamer(cacheName, new DataStreamerClientOptions<TK, TV>(options));
}
/** <inheritDoc /> */
public IDataStreamerClient<TK, TV> GetDataStreamer<TK, TV>(string cacheName,
DataStreamerClientOptions<TK, TV> options)
{
IgniteArgumentCheck.NotNullOrEmpty(cacheName, "cacheName");
return new DataStreamerClient<TK, TV>(_socket, cacheName, options);
}
/** <inheritDoc /> */
public IAtomicLongClient GetAtomicLong(string name, long initialValue, bool create)
{
return GetAtomicLong(name, null, initialValue, create);
}
/** <inheritDoc /> */
public IAtomicLongClient GetAtomicLong(
string name,
AtomicClientConfiguration configuration,
long initialValue,
bool create)
{
IgniteArgumentCheck.NotNullOrEmpty(name, "name");
if (create)
{
_socket.DoOutInOp<object>(ClientOp.AtomicLongCreate, ctx =>
{
var w = ctx.Writer;
w.WriteString(name);
w.WriteLong(initialValue);
if (configuration != null)
{
w.WriteBoolean(true);
w.WriteInt(configuration.AtomicSequenceReserveSize);
w.WriteByte((byte)configuration.CacheMode);
w.WriteInt(configuration.Backups);
w.WriteString(configuration.GroupName);
}
else
{
w.WriteBoolean(false);
}
}, null);
}
var res = new AtomicLongClient(_socket, name, configuration?.GroupName);
if (!create && res.IsClosed())
{
// Return null when specified atomic long does not exist to match thick API behavior.
return null;
}
return res;
}
/** <inheritDoc /> */
public IIgniteSetClient<T> GetIgniteSet<T>(string name, CollectionClientConfiguration configuration)
{
IgniteArgumentCheck.NotNullOrEmpty(name, "name");
return _socket.DoOutInOp(ClientOp.SetGetOrCreate, ctx =>
{
var w = ctx.Writer;
w.WriteString(name);
if (configuration != null)
{
w.WriteBoolean(true);
w.WriteByte((byte)configuration.AtomicityMode);
w.WriteByte((byte)configuration.CacheMode);
w.WriteInt(configuration.Backups);
w.WriteString(configuration.GroupName);
w.WriteBoolean(configuration.Colocated);
}
else
{
w.WriteBoolean(false);
}
}, ctx =>
{
if (!ctx.Reader.ReadBoolean())
{
return null;
}
var colocated = ctx.Reader.ReadBoolean();
var cacheId = ctx.Reader.ReadInt();
return new IgniteSetClient<T>(_socket, name, colocated, cacheId);
});
}
/** <inheritDoc /> */
public IBinaryProcessor BinaryProcessor
{
get { return _binProc; }
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public IgniteConfiguration Configuration
{
get { throw GetClientNotSupportedException(); }
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public HandleRegistry HandleRegistry
{
get { throw GetClientNotSupportedException(); }
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public ClusterNodeImpl GetNode(Guid? id)
{
throw GetClientNotSupportedException();
}
/// <summary>
/// Gets client node from the internal cache.
/// </summary>
/// <param name="id">Node Id.</param>
/// <returns>Client node.</returns>
public IClientClusterNode GetClientNode(Guid id)
{
IClientClusterNode result;
if (!_nodes.TryGetValue(id, out result))
{
throw new ArgumentException(string.Format(
CultureInfo.InvariantCulture, "Unable to find node with id='{0}'", id));
}
return result;
}
/// <summary>
/// Check whether <see cref="IgniteClient">Ignite Client</see> contains a node. />
/// </summary>
/// <param name="id">Node id.</param>
/// <returns>True if contains, False otherwise.</returns>
public bool ContainsNode(Guid id)
{
return _nodes.ContainsKey(id);
}
/** <inheritDoc /> */
public Marshaller Marshaller
{
get { return _marsh; }
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public PluginProcessor PluginProcessor
{
get { throw GetClientNotSupportedException(); }
}
/** <inheritDoc /> */
public PlatformCacheManager PlatformCacheManager
{
get { throw GetClientNotSupportedException(); }
}
/** <inheritDoc /> */
[ExcludeFromCodeCoverage]
public IDataStreamer<TK, TV> GetDataStreamer<TK, TV>(string cacheName, bool keepBinary)
{
throw GetClientNotSupportedException();
}
/// <summary>
/// Saves the node information from stream to internal cache.
/// </summary>
/// <param name="reader">Reader.</param>
public void SaveClientClusterNode(IBinaryRawReader reader)
{
var node = new ClientClusterNode(reader);
_nodes[node.Id] = node;
}
/// <summary>
/// Gets the client not supported exception.
/// </summary>
public static NotSupportedException GetClientNotSupportedException(string info = null)
{
var msg = "Operation is not supported in thin client mode.";
if (info != null)
{
msg += " " + info;
}
return new NotSupportedException(msg);
}
/// <summary>
/// Does the out in op.
/// </summary>
private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction,
Func<ClientResponseContext, T> readFunc)
{
return _socket.DoOutInOp(opId, writeAction, readFunc);
}
/// <summary>
/// Does the out op.
/// </summary>
private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null)
{
DoOutInOp<object>(opId, writeAction, null);
}
}
}