blob: 2182b438459ded773343c99f3d36564e7c29458a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Table;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Transactions;
using Common;
using Ignite.Sql;
using Ignite.Table;
using Linq;
using Serialization;
/// <summary>
/// Generic key-value view.
/// </summary>
/// <typeparam name="TK">Key type.</typeparam>
/// <typeparam name="TV">Value type.</typeparam>
internal sealed class KeyValueView<TK, TV> : IKeyValueView<TK, TV>
where TK : notnull
{
/** Record view. */
private readonly RecordView<KvPair<TK, TV>> _recordView;
/// <summary>
/// Initializes a new instance of the <see cref="KeyValueView{TK, TV}"/> class.
/// </summary>
/// <param name="recordView">Record view.</param>
public KeyValueView(RecordView<KvPair<TK, TV>> recordView)
{
_recordView = recordView;
}
/// <inheritdoc/>
public async Task<Option<TV>> GetAsync(ITransaction? transaction, TK key) =>
(await _recordView.GetAsync(transaction, ToKv(key)).ConfigureAwait(false))
.Select(static x => x.Val);
/// <inheritdoc/>
public async Task<IDictionary<TK, TV>> GetAllAsync(ITransaction? transaction, IEnumerable<TK> keys) =>
await _recordView.GetAllAsync(
transaction,
keys.Select(static k => ToKv(k)),
count => new Dictionary<TK, TV>(count),
(dict, item) =>
{
var ((key, val), hasVal) = item;
if (hasVal)
{
dict[key] = val;
}
}).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<bool> ContainsAsync(ITransaction? transaction, TK key) =>
await _recordView.ContainsKeyAsync(transaction, ToKv(key)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task PutAsync(ITransaction? transaction, TK key, TV val) =>
await _recordView.UpsertAsync(transaction, ToKv(key, val)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task PutAllAsync(ITransaction? transaction, IEnumerable<KeyValuePair<TK, TV>> pairs) =>
await _recordView.UpsertAllAsync(transaction, pairs.Select(static x => ToKv(x))).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<Option<TV>> GetAndPutAsync(ITransaction? transaction, TK key, TV val) =>
(await _recordView.GetAndUpsertAsync(transaction, new KvPair<TK, TV>(key, val)).ConfigureAwait(false))
.Select(static x => x.Val);
/// <inheritdoc/>
public async Task<bool> PutIfAbsentAsync(ITransaction? transaction, TK key, TV val) =>
await _recordView.InsertAsync(transaction, ToKv(key, val)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<bool> RemoveAsync(ITransaction? transaction, TK key) =>
await _recordView.DeleteAsync(transaction, ToKv(key)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<bool> RemoveAsync(ITransaction? transaction, TK key, TV val) =>
await _recordView.DeleteExactAsync(transaction, ToKv(key, val)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<IList<TK>> RemoveAllAsync(ITransaction? transaction, IEnumerable<TK> keys)
{
IgniteArgumentCheck.NotNull(keys);
return await _recordView.DeleteAllAsync(
transaction,
keys.Select(static k => ToKv(k)),
resultFactory: static count => count == 0
? (IList<TK>)Array.Empty<TK>()
: new List<TK>(count),
addAction: static (res, item) => res.Add(item.Key),
exact: false).ConfigureAwait(false);
}
/// <inheritdoc/>
public async Task<IList<TK>> RemoveAllAsync(ITransaction? transaction, IEnumerable<KeyValuePair<TK, TV>> pairs)
{
IgniteArgumentCheck.NotNull(pairs);
return await _recordView.DeleteAllAsync(
transaction,
pairs.Select(static k => ToKv(k)),
resultFactory: static count => count == 0
? (IList<TK>)Array.Empty<TK>()
: new List<TK>(count),
addAction: static (res, item) => res.Add(item.Key),
exact: true).ConfigureAwait(false);
}
/// <inheritdoc/>
public async Task<Option<TV>> GetAndRemoveAsync(ITransaction? transaction, TK key) =>
(await _recordView.GetAndDeleteAsync(transaction, ToKv(key)).ConfigureAwait(false))
.Select(static x => x.Val);
/// <inheritdoc/>
public async Task<bool> ReplaceAsync(ITransaction? transaction, TK key, TV val) =>
await _recordView.ReplaceAsync(transaction, ToKv(key, val)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<bool> ReplaceAsync(ITransaction? transaction, TK key, TV oldVal, TV newVal) =>
await _recordView.ReplaceAsync(transaction, ToKv(key, oldVal), ToKv(key, newVal)).ConfigureAwait(false);
/// <inheritdoc/>
public async Task<Option<TV>> GetAndReplaceAsync(ITransaction? transaction, TK key, TV val) =>
(await _recordView.GetAndReplaceAsync(transaction, ToKv(key, val)).ConfigureAwait(false))
.Select(static x => x.Val);
/// <inheritdoc/>
public IQueryable<KeyValuePair<TK, TV>> AsQueryable(ITransaction? transaction = null, QueryableOptions? options = null)
{
var executor = new IgniteQueryExecutor(_recordView.Sql, transaction, options, _recordView.Table.Socket.Configuration);
var provider = new IgniteQueryProvider(IgniteQueryParser.Instance, executor, _recordView.Table.Name);
return new IgniteQueryable<KeyValuePair<TK, TV>>(provider);
}
/// <inheritdoc/>
public async Task StreamDataAsync(
IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> data,
DataStreamerOptions? options = null,
CancellationToken cancellationToken = default) =>
await _recordView.StreamDataAsync(ToKv(data), options, cancellationToken).ConfigureAwait(false);
/// <inheritdoc/>
public override string ToString() =>
new IgniteToStringBuilder(GetType())
.Append(_recordView.Table, "Table")
.Build();
private static KvPair<TK, TV> ToKv(KeyValuePair<TK, TV> x)
{
IgniteArgumentCheck.NotNull(x.Key);
return new(x.Key, x.Value);
}
private static KvPair<TK, TV> ToKv(TK key)
{
IgniteArgumentCheck.NotNull(key);
return new(key);
}
private static KvPair<TK, TV> ToKv(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key);
return new(key, val);
}
private static async IAsyncEnumerable<DataStreamerItem<KvPair<TK, TV>>> ToKv(
IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> pairs)
{
await foreach (var pair in pairs)
{
yield return DataStreamerItem.Create(ToKv(pair.Data), pair.OperationType);
}
}
}