blob: 6fd911589f6f656da513d7213b151a816b370e72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "impl/message.h"
#include "impl/transactions/transaction_impl.h"
#include "impl/transactions/transactions_impl.h"
#include "impl/response_status.h"
using namespace ignite::common::concurrent;
using namespace ignite::impl::thin;
using namespace ignite::thin::transactions;
namespace ignite
{
namespace impl
{
namespace thin
{
namespace transactions
{
template<typename ReqT, typename RspT>
void TransactionImpl::SendTxMessage(const ReqT& req, RspT& rsp)
{
channel.Get()->SyncMessage(req, rsp, static_cast<int32_t>(timeout / 1000) + ioTimeout);
if (rsp.GetStatus() != ResponseStatus::SUCCESS)
throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
}
SP_TransactionImpl TransactionImpl::Create(
TransactionsImpl& txs,
SP_DataRouter& router,
TransactionConcurrency::Type concurrency,
TransactionIsolation::Type isolation,
int64_t timeout,
int32_t txSize,
SharedPointer<common::FixedSizeArray<char> > label)
{
SP_TransactionImpl tx = txs.GetCurrent();
TransactionImpl* ptr = tx.Get();
if (ptr && !ptr->IsClosed())
throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED);
TxStartRequest req(concurrency, isolation, timeout, label);
Int32Response rsp;
SP_DataChannel channel = router.Get()->SyncMessage(req, rsp);
if (rsp.GetStatus() != ResponseStatus::SUCCESS)
throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str());
int32_t curTxId = rsp.GetValue();
tx = SP_TransactionImpl(new TransactionImpl(txs, channel, curTxId, concurrency,
isolation, timeout, router.Get()->GetIoTimeout(), txSize));
txs.SetCurrent(tx);
return tx;
}
bool TransactionImpl::IsClosed() const
{
return closed;
}
void TransactionImpl::Commit()
{
ThreadCheck();
TxEndRequest req(txId, true);
Response rsp;
SendTxMessage(req, rsp);
ThreadEnd();
}
void TransactionImpl::Rollback()
{
ThreadCheck();
TxEndRequest req(txId, false);
Response rsp;
SendTxMessage(req, rsp);
ThreadEnd();
}
void TransactionImpl::Close()
{
ThreadCheck();
if (IsClosed())
{
return;
}
Rollback();
ThreadEnd();
}
void TransactionImpl::SetClosed()
{
closed = true;
}
void TransactionImpl::ThreadEnd()
{
this->SetClosed();
txs.ResetCurrent();
}
void TransactionImpl::ThreadCheck()
{
SP_TransactionImpl tx = txs.GetCurrent();
TransactionImpl* ptr = tx.Get();
if (!ptr)
throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED);
if (ptr->TxId() != this->TxId())
throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD);
}
}
}
}
}