blob: c2976cf5fa52535b0b1ac2f84088925244133ff4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Transactions;
using System;
using System.Threading;
using System.Threading.Tasks;
using Common;
using Ignite.Transactions;
using Proto;
using Tx;
/// <summary>
/// Lazy Ignite transaction.
/// </summary>
internal sealed class LazyTransaction : ITransaction
{
/// <summary>
/// Transaction ID placeholder. Uses MaxValue to reserve bytes in varint format.
/// It will also work correctly if the actual tx id matches the placeholder.
/// </summary>
public const long TxIdPlaceholder = long.MaxValue;
private const int StateOpen = 0;
private const int StateCommitted = 1;
private const int StateRolledBack = 2;
private readonly object _syncRoot = new();
private readonly TransactionOptions _options;
private int _state = StateOpen;
private volatile Task<Transaction>? _tx;
/// <summary>
/// Initializes a new instance of the <see cref="LazyTransaction"/> class.
/// </summary>
/// <param name="options">Options.</param>
public LazyTransaction(TransactionOptions options) => _options = options;
/// <inheritdoc/>
public bool IsReadOnly => _options.ReadOnly;
/// <summary>
/// Gets the transaction ID.
/// </summary>
internal long Id =>
_tx is { IsCompleted: true }
? _tx.Result.Id
: TxIdPlaceholder;
/// <summary>
/// Gets the transaction state.
/// </summary>
private string State => _state switch
{
StateOpen => "Open",
StateCommitted => "Committed",
_ => "RolledBack"
};
/// <inheritdoc/>
public async Task CommitAsync()
{
if (TrySetState(StateCommitted))
{
await DoOpAsync(_tx, ClientOp.TxCommit).ConfigureAwait(false);
}
}
/// <inheritdoc/>
public async Task RollbackAsync()
{
if (TrySetState(StateRolledBack))
{
await DoOpAsync(_tx, ClientOp.TxRollback).ConfigureAwait(false);
}
}
/// <inheritdoc/>
public async ValueTask DisposeAsync() => await RollbackAsync().ConfigureAwait(false);
/// <inheritdoc/>
public override string ToString()
{
var builder = new IgniteToStringBuilder(typeof(Transaction));
builder.Append(Id);
builder.Append(State);
builder.Append(IsReadOnly);
return builder.Build();
}
/// <summary>
/// Ensures that the lazy transaction is actually started on the server.
/// </summary>
/// <param name="tx">Lazy transaction.</param>
/// <param name="socket">Socket.</param>
/// <param name="preferredNode">Preferred target node.</param>
/// <returns>Task that will be completed when the transaction is started.</returns>
internal static async ValueTask<Transaction?> EnsureStartedAsync(ITransaction? tx, ClientFailoverSocket socket, PreferredNode preferredNode) =>
Get(tx) is { } lazyTx
? await lazyTx.EnsureStartedAsync(socket, preferredNode).ConfigureAwait(false)
: null;
/// <summary>
/// Gets the underlying lazy transaction or throws if the transaction type is not supported.
/// </summary>
/// <param name="tx">Public transaction.</param>
/// <returns>Internal lazy transaction.</returns>
internal static LazyTransaction? Get(ITransaction? tx) => tx switch
{
null => null,
LazyTransaction t => t,
_ => throw new TransactionException(
Guid.NewGuid(),
ErrorGroups.Common.Internal,
"Unsupported transaction implementation: " + tx.GetType())
};
/// <summary>
/// Gets a value indicating whether the underlying lazy transaction is started.
/// </summary>
/// <param name="tx">Transaction.</param>
/// <returns>True when the underlying lazy transaction is started, false otherwise.</returns>
internal static bool IsStarted(ITransaction? tx) => Get(tx)?._tx != null;
private static async Task DoOpAsync(Task<Transaction>? txTask, ClientOp op)
{
if (txTask == null)
{
// No operations were performed, nothing to commit or roll back.
return;
}
var tx = await txTask.ConfigureAwait(false);
using var writer = ProtoCommon.GetMessageWriter();
writer.MessageWriter.Write(tx.Id);
using var buffer = await tx.Socket.DoOutInOpAsync(op, writer).ConfigureAwait(false);
}
private Task<Transaction> EnsureStartedAsync(ClientFailoverSocket socket, PreferredNode preferredNode)
{
lock (_syncRoot)
{
var txTask = _tx;
if (txTask != null)
{
return txTask;
}
txTask = BeginAsync(socket, preferredNode);
_tx = txTask;
return txTask;
}
}
private async Task<Transaction> BeginAsync(ClientFailoverSocket failoverSocket, PreferredNode preferredNode)
{
using var writer = ProtoCommon.GetMessageWriter();
Write();
// Transaction and all corresponding operations must be performed using the same connection.
var (resBuf, socket) = await failoverSocket.DoOutInOpAndGetSocketAsync(
ClientOp.TxBegin, request: writer, preferredNode: preferredNode).ConfigureAwait(false);
using (resBuf)
{
var txId = resBuf.GetReader().ReadInt64();
return new Transaction(txId, socket, failoverSocket);
}
void Write()
{
var w = writer.MessageWriter;
w.Write(_options.ReadOnly);
w.Write(failoverSocket.ObservableTimestamp);
}
}
private bool TrySetState(int state) => Interlocked.CompareExchange(ref _state, state, StateOpen) == StateOpen;
}