| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| namespace Apache.Ignite.Core.Impl.Client.Cache |
| { |
| using System; |
| using System.Collections.Generic; |
| using System.Diagnostics; |
| using System.Diagnostics.CodeAnalysis; |
| using System.IO; |
| using System.Threading.Tasks; |
| using Apache.Ignite.Core.Binary; |
| using Apache.Ignite.Core.Cache; |
| using Apache.Ignite.Core.Cache.Configuration; |
| using Apache.Ignite.Core.Cache.Event; |
| using Apache.Ignite.Core.Cache.Expiry; |
| using Apache.Ignite.Core.Cache.Query; |
| using Apache.Ignite.Core.Client; |
| using Apache.Ignite.Core.Client.Cache; |
| using Apache.Ignite.Core.Client.Cache.Query.Continuous; |
| using Apache.Ignite.Core.Impl.Binary; |
| using Apache.Ignite.Core.Impl.Binary.IO; |
| using Apache.Ignite.Core.Impl.Cache; |
| using Apache.Ignite.Core.Impl.Cache.Expiry; |
| using Apache.Ignite.Core.Impl.Cache.Query.Continuous; |
| using Apache.Ignite.Core.Impl.Client; |
| using Apache.Ignite.Core.Impl.Client.Cache.Query; |
| using Apache.Ignite.Core.Impl.Common; |
| using Apache.Ignite.Core.Impl.Log; |
| using Apache.Ignite.Core.Log; |
| using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter; |
| |
| /// <summary> |
| /// Client cache implementation. |
| /// </summary> |
| internal sealed class CacheClient<TK, TV> : ICacheClient<TK, TV>, ICacheInternal |
| { |
| /// <summary> |
| /// Additional flags values for cache operations. |
| /// </summary> |
| [Flags] |
| private enum ClientCacheRequestFlag : byte |
| { |
| /// <summary> |
| /// No flags |
| /// </summary> |
| None = 0, |
| |
| /// <summary> |
| /// With keep binary flag. |
| /// Reserved for other thin clients. |
| /// </summary> |
| // ReSharper disable once ShiftExpressionRealShiftCountIsZero |
| // ReSharper disable once UnusedMember.Local |
| WithKeepBinary = 1 << 0, |
| |
| /// <summary> |
| /// With transactional binary flag. |
| /// Reserved for IEP-34 Thin client: transactions support. |
| /// </summary> |
| // ReSharper disable once UnusedMember.Local |
| WithTransactional = 1 << 1, |
| |
| /// <summary> |
| /// With expiration policy. |
| /// </summary> |
| WithExpiryPolicy = 1 << 2 |
| } |
| |
| /** Cache name. */ |
| private readonly string _name; |
| |
| /** Cache id. */ |
| private readonly int _id; |
| |
| /** Ignite. */ |
| private readonly IgniteClient _ignite; |
| |
| /** Marshaller. */ |
| private readonly Marshaller _marsh; |
| |
| /** Keep binary flag. */ |
| private readonly bool _keepBinary; |
| |
| /** Expiry policy. */ |
| private readonly IExpiryPolicy _expiryPolicy; |
| |
| /** Logger. Lazily initialized. */ |
| private ILogger _logger; |
| |
| /// <summary> |
| /// Initializes a new instance of the <see cref="CacheClient{TK, TV}" /> class. |
| /// </summary> |
| /// <param name="ignite">Ignite.</param> |
| /// <param name="name">Cache name.</param> |
| /// <param name="keepBinary">Binary mode flag.</param> |
| /// /// <param name="expiryPolicy">Expire policy.</param> |
| public CacheClient(IgniteClient ignite, string name, bool keepBinary = false, IExpiryPolicy expiryPolicy = null) |
| { |
| Debug.Assert(ignite != null); |
| Debug.Assert(name != null); |
| |
| _name = name; |
| _ignite = ignite; |
| _marsh = _ignite.Marshaller; |
| _id = BinaryUtils.GetCacheId(name); |
| _keepBinary = keepBinary; |
| _expiryPolicy = expiryPolicy; |
| } |
| |
| /** <inheritDoc /> */ |
| public string Name |
| { |
| get { return _name; } |
| } |
| |
| /** <inheritDoc /> */ |
| public TV this[TK key] |
| { |
| get { return Get(key); } |
| set { Put(key, value); } |
| } |
| |
| /** <inheritDoc /> */ |
| public TV Get(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheGet, key, ctx => UnmarshalNotNull<TV>(ctx)); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<TV> GetAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key), |
| ctx => UnmarshalNotNull<TV>(ctx)); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool TryGet(TK key, out TV value) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| var res = DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalCacheResult<TV>); |
| |
| value = res.Value; |
| |
| return res.Success; |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<CacheResult<TV>> TryGetAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, ctx => ctx.Writer.WriteObjectDetached(key), |
| UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public ICollection<ICacheEntry<TK, TV>> GetAll(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOp(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys), |
| s => ReadCacheEntries(s.Stream)); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<ICollection<ICacheEntry<TK, TV>>> GetAllAsync(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| return DoOutInOpAsync(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys), |
| s => ReadCacheEntries(s.Stream)); |
| } |
| |
| /** <inheritDoc /> */ |
| public void Put(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| DoOutInOpAffinity<object>(ClientOp.CachePut, key, val, null); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task PutAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutOpAffinityAsync(ClientOp.CachePut, key, ctx => { |
| ctx.Writer.WriteObjectDetached(key); |
| ctx.Writer.WriteObjectDetached(val); |
| }); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool ContainsKey(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> ContainsKeyAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool ContainsKeys(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| return DoOutInOp(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys), |
| ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| return DoOutInOpAsync(ClientOp.CacheContainsKeys, ctx => ctx.Writer.WriteEnumerable(keys), |
| ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public IQueryCursor<ICacheEntry<TK, TV>> Query(ScanQuery<TK, TV> scanQuery) |
| { |
| IgniteArgumentCheck.NotNull(scanQuery, "scanQuery"); |
| |
| // Filter is a binary object for all platforms. |
| // For .NET it is a CacheEntryFilterHolder with a predefined id (BinaryTypeId.CacheEntryPredicateHolder). |
| return DoOutInOp(ClientOp.QueryScan, w => WriteScanQuery(w.Writer, scanQuery), |
| ctx => new ClientQueryCursor<TK, TV>( |
| ctx.Socket, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QueryScanCursorGetPage)); |
| } |
| |
| /** <inheritDoc /> */ |
| [Obsolete] |
| public IQueryCursor<ICacheEntry<TK, TV>> Query(SqlQuery sqlQuery) |
| { |
| IgniteArgumentCheck.NotNull(sqlQuery, "sqlQuery"); |
| IgniteArgumentCheck.NotNull(sqlQuery.Sql, "sqlQuery.Sql"); |
| IgniteArgumentCheck.NotNull(sqlQuery.QueryType, "sqlQuery.QueryType"); |
| |
| return DoOutInOp(ClientOp.QuerySql, w => WriteSqlQuery(w.Writer, sqlQuery), |
| ctx => new ClientQueryCursor<TK, TV>( |
| ctx.Socket, ctx.Stream.ReadLong(), _keepBinary, ctx.Stream, ClientOp.QuerySqlCursorGetPage)); |
| } |
| |
| /** <inheritDoc /> */ |
| public IFieldsQueryCursor Query(SqlFieldsQuery sqlFieldsQuery) |
| { |
| IgniteArgumentCheck.NotNull(sqlFieldsQuery, "sqlFieldsQuery"); |
| IgniteArgumentCheck.NotNull(sqlFieldsQuery.Sql, "sqlFieldsQuery.Sql"); |
| |
| return DoOutInOp(ClientOp.QuerySqlFields, |
| ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery), |
| ctx => GetFieldsCursor(ctx)); |
| } |
| |
| /** <inheritDoc /> */ |
| public IQueryCursor<T> Query<T>(SqlFieldsQuery sqlFieldsQuery, Func<IBinaryRawReader, int, T> readerFunc) |
| { |
| return DoOutInOp(ClientOp.QuerySqlFields, |
| ctx => WriteSqlFieldsQuery(ctx.Writer, sqlFieldsQuery, false), |
| ctx => GetFieldsCursorNoColumnNames(ctx, readerFunc)); |
| } |
| |
| /** <inheritDoc /> */ |
| public CacheResult<TV> GetAndPut(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheGetAndPut, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGetAndPut, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public CacheResult<TV> GetAndReplace(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheGetAndReplace, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGetAndReplace, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public CacheResult<TV> GetAndRemove(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheGetAndRemove, key, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<CacheResult<TV>> GetAndRemoveAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGetAndRemove, key, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool PutIfAbsent(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> PutIfAbsentAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheGetAndPutIfAbsent, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheGetAndPutIfAbsent, key, val, UnmarshalCacheResult<TV>); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool Replace(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> ReplaceAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool Replace(TK key, TV oldVal, TV newVal) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(oldVal, "oldVal"); |
| IgniteArgumentCheck.NotNull(newVal, "newVal"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, ctx => |
| { |
| ctx.Writer.WriteObjectDetached(key); |
| ctx.Writer.WriteObjectDetached(oldVal); |
| ctx.Writer.WriteObjectDetached(newVal); |
| }, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(oldVal, "oldVal"); |
| IgniteArgumentCheck.NotNull(newVal, "newVal"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, ctx => |
| { |
| ctx.Writer.WriteObjectDetached(key); |
| ctx.Writer.WriteObjectDetached(oldVal); |
| ctx.Writer.WriteObjectDetached(newVal); |
| }, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public void PutAll(IEnumerable<KeyValuePair<TK, TV>> vals) |
| { |
| IgniteArgumentCheck.NotNull(vals, "vals"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| DoOutOp(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals)); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task PutAllAsync(IEnumerable<KeyValuePair<TK, TV>> vals) |
| { |
| IgniteArgumentCheck.NotNull(vals, "vals"); |
| |
| return DoOutOpAsync(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals)); |
| } |
| |
| /** <inheritDoc /> */ |
| public void Clear() |
| { |
| DoOutOp(ClientOp.CacheClear); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task ClearAsync() |
| { |
| return DoOutOpAsync(ClientOp.CacheClear); |
| } |
| |
| /** <inheritDoc /> */ |
| public void Clear(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| DoOutOpAffinity(ClientOp.CacheClearKey, key); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task ClearAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, ctx => ctx.Writer.WriteObjectDetached(key)); |
| } |
| |
| /** <inheritDoc /> */ |
| public void ClearAll(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| DoOutOp(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys)); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task ClearAllAsync(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| return DoOutOpAsync(ClientOp.CacheClearKeys, ctx => ctx.Writer.WriteEnumerable(keys)); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool Remove(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> RemoveAsync(TK key) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public bool Remove(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<bool> RemoveAsync(TK key, TV val) |
| { |
| IgniteArgumentCheck.NotNull(key, "key"); |
| IgniteArgumentCheck.NotNull(val, "val"); |
| |
| return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool()); |
| } |
| |
| /** <inheritDoc /> */ |
| public void RemoveAll(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| DoOutOp(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys)); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task RemoveAllAsync(IEnumerable<TK> keys) |
| { |
| IgniteArgumentCheck.NotNull(keys, "keys"); |
| |
| return DoOutOpAsync(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys)); |
| } |
| |
| /** <inheritDoc /> */ |
| public void RemoveAll() |
| { |
| _ignite.Transactions.StartTxIfNeeded(); |
| |
| DoOutOp(ClientOp.CacheRemoveAll); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task RemoveAllAsync() |
| { |
| return DoOutOpAsync(ClientOp.CacheRemoveAll); |
| } |
| |
| /** <inheritDoc /> */ |
| public long GetSize(params CachePeekMode[] modes) |
| { |
| return DoOutInOp(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream), |
| ctx => ctx.Stream.ReadLong()); |
| } |
| |
| /** <inheritDoc /> */ |
| public Task<long> GetSizeAsync(params CachePeekMode[] modes) |
| { |
| return DoOutInOpAsync(ClientOp.CacheGetSize, w => WritePeekModes(modes, w.Stream), |
| ctx => ctx.Stream.ReadLong()); |
| } |
| |
| /** <inheritDoc /> */ |
| public CacheClientConfiguration GetConfiguration() |
| { |
| return DoOutInOp(ClientOp.CacheGetConfiguration, null, |
| ctx => new CacheClientConfiguration(ctx.Stream, ctx.Features)); |
| } |
| |
| /** <inheritDoc /> */ |
| CacheConfiguration ICacheInternal.GetConfiguration() |
| { |
| return GetConfiguration().ToCacheConfiguration(); |
| } |
| |
| /** <inheritDoc /> */ |
| public ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>() |
| { |
| if (_keepBinary) |
| { |
| var result = this as ICacheClient<TK1, TV1>; |
| |
| if (result == null) |
| { |
| throw new InvalidOperationException( |
| "Can't change type of binary cache. WithKeepBinary has been called on an instance of " + |
| "binary cache with incompatible generic arguments."); |
| } |
| |
| return result; |
| } |
| |
| return new CacheClient<TK1, TV1>(_ignite, _name, true, _expiryPolicy); |
| } |
| |
| /** <inheritDoc /> */ |
| public ICacheClient<TK, TV> WithExpiryPolicy(IExpiryPolicy plc) |
| { |
| IgniteArgumentCheck.NotNull(plc, "plc"); |
| |
| // WithExpiryPolicy is not supported on protocols older than 1.5.0. |
| // However, we can't check that here because of partition awareness, reconnect and so on: |
| // We don't know which connection is going to be used. This connection may not even exist yet. |
| // See WriteRequest. |
| return new CacheClient<TK, TV>(_ignite, _name, _keepBinary, plc); |
| } |
| |
| /** <inheritDoc /> */ |
| public IContinuousQueryHandleClient QueryContinuous(ContinuousQueryClient<TK, TV> continuousQuery) |
| { |
| IgniteArgumentCheck.NotNull(continuousQuery, "continuousQuery"); |
| IgniteArgumentCheck.NotNull(continuousQuery.Listener, "continuousQuery.Listener"); |
| |
| return QueryContinuousInternal(continuousQuery); |
| } |
| |
| /** <inheritDoc /> */ |
| [ExcludeFromCodeCoverage] |
| public T DoOutInOpExtension<T>(int extensionId, int opCode, Action<IBinaryRawWriter> writeAction, |
| Func<IBinaryRawReader, T> readFunc) |
| { |
| // Should not be called, there are no usages for thin client. |
| throw IgniteClient.GetClientNotSupportedException(); |
| } |
| |
| /// <summary> |
| /// Does the out op. |
| /// </summary> |
| private void DoOutOp(ClientOp opId, Action<ClientRequestContext> writeAction = null) |
| { |
| DoOutInOp<object>(opId, writeAction, null); |
| } |
| |
| /// <summary> |
| /// Does the out op with partition awareness. |
| /// </summary> |
| private void DoOutOpAffinity(ClientOp opId, TK key) |
| { |
| DoOutInOpAffinity<object>(opId, key, null); |
| } |
| |
| /// <summary> |
| /// Does the out op with partition awareness. |
| /// </summary> |
| private Task DoOutOpAsync(ClientOp opId, Action<ClientRequestContext> writeAction = null) |
| { |
| return DoOutInOpAsync<object>(opId, writeAction, null); |
| } |
| |
| /// <summary> |
| /// Does the out op with partition awareness. |
| /// </summary> |
| private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<ClientRequestContext> writeAction = null) |
| { |
| return DoOutInOpAffinityAsync<object>(opId, key, writeAction, null); |
| } |
| |
| /// <summary> |
| /// Does the out in op. |
| /// </summary> |
| private T DoOutInOp<T>(ClientOp opId, Action<ClientRequestContext> writeAction, |
| Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOp(opId, ctx => WriteRequest(writeAction, ctx), |
| readFunc, HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinity( |
| opId, |
| ctx => WriteRequest(c => c.Writer.WriteObjectDetached(key), ctx), |
| readFunc, |
| _id, |
| key, |
| HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction, |
| Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinity( |
| opId, |
| ctx => WriteRequest(writeAction, ctx), |
| readFunc, |
| _id, |
| key, |
| HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinity( |
| opId, |
| ctx => WriteRequest(c => |
| { |
| c.Writer.WriteObjectDetached(key); |
| c.Writer.WriteObjectDetached(val); |
| }, ctx), |
| readFunc, |
| _id, |
| key, |
| HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op. |
| /// </summary> |
| private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<ClientRequestContext> writeAction, |
| Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAsync(opId, ctx => WriteRequest(writeAction, ctx), |
| readFunc, HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<ClientRequestContext> writeAction, |
| Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinityAsync(opId, ctx => WriteRequest(writeAction, ctx), |
| readFunc, _id, key, HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinityAsync( |
| opId, |
| ctx => WriteRequest(c => |
| { |
| c.Writer.WriteObjectDetached(key); |
| c.Writer.WriteObjectDetached(val); |
| }, ctx), |
| readFunc, |
| _id, |
| key, |
| HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Does the out in op with partition awareness. |
| /// </summary> |
| private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<ClientResponseContext, T> readFunc) |
| { |
| return _ignite.Socket.DoOutInOpAffinityAsync(opId, |
| stream => WriteRequest(w => w.Writer.WriteObjectDetached(key), stream), |
| readFunc, _id, key, HandleError<T>); |
| } |
| |
| /// <summary> |
| /// Writes the request. |
| /// </summary> |
| private void WriteRequest(Action<ClientRequestContext> writeAction, ClientRequestContext ctx) |
| { |
| ctx.Stream.WriteInt(_id); |
| |
| var flags = ClientCacheRequestFlag.None; |
| if (_expiryPolicy != null) |
| { |
| ctx.Features.ValidateWithExpiryPolicyFlag(); |
| flags = flags | ClientCacheRequestFlag.WithExpiryPolicy; |
| } |
| |
| var tx = _ignite.Transactions.Tx; |
| if (tx != null) |
| { |
| flags |= ClientCacheRequestFlag.WithTransactional; |
| } |
| |
| ctx.Stream.WriteByte((byte) flags); |
| |
| if ((flags & ClientCacheRequestFlag.WithExpiryPolicy) == ClientCacheRequestFlag.WithExpiryPolicy) |
| { |
| ExpiryPolicySerializer.WritePolicy(ctx.Writer, _expiryPolicy); |
| } |
| |
| if ((flags & ClientCacheRequestFlag.WithTransactional) == ClientCacheRequestFlag.WithTransactional) |
| { |
| // ReSharper disable once PossibleNullReferenceException flag is set only if tx != null |
| ctx.Writer.WriteInt(tx.Id); |
| } |
| |
| if (writeAction != null) |
| { |
| writeAction(ctx); |
| } |
| } |
| |
| /// <summary> |
| /// Unmarshals the value, throwing an exception for nulls. |
| /// </summary> |
| private T UnmarshalNotNull<T>(ClientResponseContext ctx) |
| { |
| var stream = ctx.Stream; |
| var hdr = stream.ReadByte(); |
| |
| if (hdr == BinaryUtils.HdrNull) |
| { |
| throw GetKeyNotFoundException(); |
| } |
| |
| stream.Seek(-1, SeekOrigin.Current); |
| |
| return _marsh.Unmarshal<T>(stream, _keepBinary); |
| } |
| |
| /// <summary> |
| /// Unmarshals the value, wrapping in a cache result. |
| /// </summary> |
| private CacheResult<T> UnmarshalCacheResult<T>(ClientResponseContext ctx) |
| { |
| var stream = ctx.Stream; |
| var hdr = stream.ReadByte(); |
| |
| if (hdr == BinaryUtils.HdrNull) |
| { |
| return new CacheResult<T>(); |
| } |
| |
| stream.Seek(-1, SeekOrigin.Current); |
| |
| return new CacheResult<T>(_marsh.Unmarshal<T>(stream, _keepBinary)); |
| } |
| |
| /// <summary> |
| /// Writes the scan query. |
| /// </summary> |
| private void WriteScanQuery(BinaryWriter writer, ScanQuery<TK, TV> qry) |
| { |
| Debug.Assert(qry != null); |
| |
| if (qry.Filter == null) |
| { |
| writer.WriteByte(BinaryUtils.HdrNull); |
| } |
| else |
| { |
| var holder = new CacheEntryFilterHolder(qry.Filter, (key, val) => qry.Filter.Invoke( |
| new CacheEntry<TK, TV>((TK)key, (TV)val)), writer.Marshaller, _keepBinary); |
| |
| writer.WriteObject(holder); |
| |
| writer.WriteByte(ClientPlatformId.Dotnet); |
| } |
| |
| writer.WriteInt(qry.PageSize); |
| |
| writer.WriteInt(qry.Partition ?? -1); |
| |
| writer.WriteBoolean(qry.Local); |
| } |
| |
| /// <summary> |
| /// Writes the SQL query. |
| /// </summary> |
| [Obsolete] |
| private static void WriteSqlQuery(IBinaryRawWriter writer, SqlQuery qry) |
| { |
| Debug.Assert(qry != null); |
| |
| writer.WriteString(qry.QueryType); |
| writer.WriteString(qry.Sql); |
| QueryBase.WriteQueryArgs(writer, qry.Arguments); |
| writer.WriteBoolean(qry.EnableDistributedJoins); |
| writer.WriteBoolean(qry.Local); |
| #pragma warning disable 618 |
| writer.WriteBoolean(qry.ReplicatedOnly); |
| #pragma warning restore 618 |
| writer.WriteInt(qry.PageSize); |
| writer.WriteTimeSpanAsLong(qry.Timeout); |
| } |
| |
| /// <summary> |
| /// Writes the SQL fields query. |
| /// </summary> |
| private static void WriteSqlFieldsQuery(IBinaryRawWriter writer, SqlFieldsQuery qry, |
| bool includeColumns = true) |
| { |
| Debug.Assert(qry != null); |
| |
| writer.WriteString(qry.Schema); |
| writer.WriteInt(qry.PageSize); |
| writer.WriteInt(-1); // maxRows: unlimited |
| writer.WriteString(qry.Sql); |
| QueryBase.WriteQueryArgs(writer, qry.Arguments); |
| |
| // .NET client does not discern between different statements for now. |
| // We could have ExecuteNonQuery method, which uses StatementType.Update, for example. |
| writer.WriteByte((byte)StatementType.Any); |
| |
| writer.WriteBoolean(qry.EnableDistributedJoins); |
| writer.WriteBoolean(qry.Local); |
| #pragma warning disable 618 |
| writer.WriteBoolean(qry.ReplicatedOnly); |
| #pragma warning restore 618 |
| writer.WriteBoolean(qry.EnforceJoinOrder); |
| writer.WriteBoolean(qry.Colocated); |
| writer.WriteBoolean(qry.Lazy); |
| writer.WriteTimeSpanAsLong(qry.Timeout); |
| writer.WriteBoolean(includeColumns); |
| |
| if (qry.Partitions != null) |
| { |
| writer.WriteInt(qry.Partitions.Length); |
| |
| foreach (var part in qry.Partitions) |
| { |
| writer.WriteInt(part); |
| } |
| } |
| else |
| { |
| writer.WriteInt(-1); |
| } |
| |
| writer.WriteInt(qry.UpdateBatchSize); |
| } |
| |
| /// <summary> |
| /// Gets the fields cursor. |
| /// </summary> |
| private ClientFieldsQueryCursor GetFieldsCursor(ClientResponseContext ctx) |
| { |
| var cursorId = ctx.Stream.ReadLong(); |
| var columnNames = ClientFieldsQueryCursor.ReadColumns(ctx.Reader); |
| |
| return new ClientFieldsQueryCursor(ctx.Socket, cursorId, _keepBinary, ctx.Stream, |
| ClientOp.QuerySqlFieldsCursorGetPage, columnNames); |
| } |
| |
| /// <summary> |
| /// Gets the fields cursor. |
| /// </summary> |
| private ClientQueryCursorBase<T> GetFieldsCursorNoColumnNames<T>(ClientResponseContext ctx, |
| Func<IBinaryRawReader, int, T> readerFunc) |
| { |
| var cursorId = ctx.Stream.ReadLong(); |
| var columnCount = ctx.Stream.ReadInt(); |
| |
| return new ClientQueryCursorBase<T>(ctx.Socket, cursorId, _keepBinary, ctx.Stream, |
| ClientOp.QuerySqlFieldsCursorGetPage, r => readerFunc(r, columnCount)); |
| } |
| |
| /// <summary> |
| /// Handles the error. |
| /// </summary> |
| private T HandleError<T>(ClientStatusCode status, string msg) |
| { |
| switch (status) |
| { |
| case ClientStatusCode.CacheDoesNotExist: |
| throw new IgniteClientException("Cache doesn't exist: " + Name, null, status); |
| |
| default: |
| throw new IgniteClientException(msg, null, status); |
| } |
| } |
| |
| /// <summary> |
| /// Gets the key not found exception. |
| /// </summary> |
| private static KeyNotFoundException GetKeyNotFoundException() |
| { |
| return new KeyNotFoundException("The given key was not present in the cache."); |
| } |
| |
| /// <summary> |
| /// Writes the peek modes. |
| /// </summary> |
| private static void WritePeekModes(ICollection<CachePeekMode> modes, IBinaryStream w) |
| { |
| if (modes == null) |
| { |
| w.WriteInt(0); |
| } |
| else |
| { |
| w.WriteInt(modes.Count); |
| |
| foreach (var m in modes) |
| { |
| // Convert bit flag to ordinal. |
| byte val = 0; |
| var flagVal = (int)m; |
| |
| while ((flagVal = flagVal >> 1) > 0) |
| { |
| val++; |
| } |
| |
| w.WriteByte(val); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Reads the cache entries. |
| /// </summary> |
| private ICollection<ICacheEntry<TK, TV>> ReadCacheEntries(IBinaryStream stream) |
| { |
| var reader = _marsh.StartUnmarshal(stream, _keepBinary); |
| |
| var cnt = reader.ReadInt(); |
| var res = new List<ICacheEntry<TK, TV>>(cnt); |
| |
| for (var i = 0; i < cnt; i++) |
| { |
| res.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); |
| } |
| |
| return res; |
| } |
| |
| /// <summary> |
| /// Starts the continuous query. |
| /// </summary> |
| private ClientContinuousQueryHandle QueryContinuousInternal( |
| ContinuousQueryClient<TK, TV> continuousQuery) |
| { |
| Debug.Assert(continuousQuery != null); |
| Debug.Assert(continuousQuery.Listener != null); |
| |
| var listener = continuousQuery.Listener; |
| |
| return DoOutInOp( |
| ClientOp.QueryContinuous, |
| ctx => WriteContinuousQuery(ctx, continuousQuery), |
| ctx => |
| { |
| var queryId = ctx.Stream.ReadLong(); |
| |
| var qryHandle = new ClientContinuousQueryHandle(ctx.Socket, queryId); |
| |
| ctx.Socket.AddNotificationHandler(queryId, |
| (stream, err) => HandleContinuousQueryEvents(stream, err, listener, qryHandle)); |
| |
| return qryHandle; |
| }); |
| } |
| |
| /// <summary> |
| /// Writes the continuous query. |
| /// </summary> |
| /// <param name="ctx">Request context.</param> |
| /// <param name="continuousQuery">Query.</param> |
| private void WriteContinuousQuery(ClientRequestContext ctx, ContinuousQueryClient<TK, TV> continuousQuery) |
| { |
| var w = ctx.Writer; |
| w.WriteInt(continuousQuery.BufferSize); |
| w.WriteLong((long) continuousQuery.TimeInterval.TotalMilliseconds); |
| w.WriteBoolean(continuousQuery.IncludeExpired); |
| |
| if (continuousQuery.Filter == null) |
| { |
| w.WriteObject<object>(null); |
| } |
| else |
| { |
| var javaFilter = continuousQuery.Filter as PlatformJavaObjectFactoryProxy; |
| |
| if (javaFilter != null) |
| { |
| w.WriteObject(javaFilter.GetRawProxy()); |
| w.WriteByte(ClientPlatformId.Java); |
| } |
| else |
| { |
| var filterHolder = new ContinuousQueryFilterHolder(continuousQuery.Filter, _keepBinary); |
| |
| w.WriteObject(filterHolder); |
| w.WriteByte(ClientPlatformId.Dotnet); |
| } |
| } |
| |
| ctx.Socket.ExpectNotifications(); |
| } |
| |
| /// <summary> |
| /// Handles continuous query events. |
| /// </summary> |
| private void HandleContinuousQueryEvents(IBinaryStream stream, Exception err, |
| ICacheEntryEventListener<TK, TV> listener, ClientContinuousQueryHandle qryHandle) |
| { |
| if (err != null) |
| { |
| qryHandle.OnError(err); |
| return; |
| } |
| |
| var flags = (ClientFlags) stream.ReadShort(); |
| var opCode = (ClientOp) stream.ReadShort(); |
| |
| if ((flags & ClientFlags.Error) == ClientFlags.Error) |
| { |
| var status = (ClientStatusCode) stream.ReadInt(); |
| var msg = _marsh.Unmarshal<string>(stream); |
| |
| GetLogger().Error("Error while handling Continuous Query notification ({0}): {1}", status, msg); |
| |
| qryHandle.OnError(new IgniteClientException(msg, null, status)); |
| |
| return; |
| } |
| |
| if (opCode == ClientOp.QueryContinuousEventNotification) |
| { |
| var evts = ContinuousQueryUtils.ReadEvents<TK, TV>(stream, _marsh, _keepBinary); |
| |
| listener.OnEvent(evts); |
| |
| return; |
| } |
| |
| GetLogger().Error("Error while handling Continuous Query notification: unexpected op '{0}'", opCode); |
| } |
| |
| /// <summary> |
| /// Gets the logger. |
| /// </summary> |
| private ILogger GetLogger() |
| { |
| // Don't care about thread safety here, it is ok to initialize multiple times. |
| // ReSharper disable once ConvertIfStatementToNullCoalescingExpression (readability). |
| if (_logger == null) |
| { |
| _logger = _ignite.Configuration.Logger != null |
| ? _ignite.Configuration.Logger.GetLogger(GetType()) |
| : NoopLogger.Instance; |
| } |
| |
| return _logger; |
| } |
| } |
| } |