blob: 9c4bfebaf1c077bc6d997e8a7bf0929cf87c4202 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "Transaction.h"
#include "SessionContext.h"
#include "ConnectionContext.h"
#include "PnData.h"
#include <proton/engine.h>
#include <qpid/Exception.h>
#include <qpid/amqp/descriptors.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/log/Statement.h>
#include "qpid/messaging/Message.h"
namespace qpid {
namespace messaging {
namespace amqp {
using namespace types;
using types::Exception;
namespace {
const std::string LOCAL_TRANSACTIONS("amqp:local-transactions");
const std::string TX_COORDINATOR("tx-transaction");
const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}");
}
Transaction::Transaction(pn_session_t* session) :
SenderContext(session, TX_COORDINATOR, Address(ADDRESS), SenderOptions(false, true, true)), committing(false)
{}
void Transaction::clear() {
id.clear();
sendState.reset();
acceptState.reset();
}
void Transaction::configure() {
SenderContext::configure();
pn_terminus_t* target = pn_link_target(sender);
pn_terminus_set_type(target, PN_COORDINATOR);
PnData(pn_terminus_capabilities(target)).putSymbol(LOCAL_TRANSACTIONS);
}
void Transaction::verify() {}
const std::string& Transaction::getTarget() const { return getName(); }
void Transaction::declare(SendFunction send, const SessionPtr& session) {
committing = false;
error.raise();
clear();
Variant declare = Variant::described(qpid::amqp::transaction::DECLARE_CODE, Variant::List());
SenderContext::Delivery* delivery = 0;
send(session, shared_from_this(), Message(declare), true, &delivery);
setId(*delivery);
}
void Transaction::discharge(SendFunction send, const SessionPtr& session, bool fail) {
error.raise();
committing = !fail;
try {
// Send a discharge message to the remote coordinator.
Variant::List dischargeList;
dischargeList.push_back(Variant(id));
dischargeList.push_back(Variant(fail));
Variant discharge(dischargeList);
discharge.setDescriptor(qpid::amqp::transaction::DISCHARGE_CODE);
SenderContext::Delivery* delivery = 0;
send(session, shared_from_this(), Message(discharge), true, &delivery);
if (!delivery->accepted())
throw TransactionAborted(delivery->error());
committing = false;
}
catch(const TransactionError&) {
throw;
}
catch(const Exception& e) {
committing = false;
throw TransactionAborted(e.what());
}
}
// Set the transaction ID from the delivery returned by the remote coordinator.
void Transaction::setId(const SenderContext::Delivery& delivery)
{
if (delivery.getToken() &&
pn_delivery_remote_state(delivery.getToken()) == qpid::amqp::transaction::DECLARED_CODE)
{
pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery.getToken()));
if (data && pn_data_next(data)) {
size_t count = pn_data_get_list(data);
if (count > 0) {
pn_data_enter(data);
pn_data_next(data);
setId(PnData::string(pn_data_get_binary(data)));
pn_data_exit(data);
return;
}
}
}
throw TransactionError("No transaction ID returned by remote coordinator.");
}
void Transaction::setId(const std::string& id_) {
id = id_;
if (id.empty()) {
clear();
}
else {
// NOTE: The send and accept states are NOT described, the descriptor
// is added in pn_delivery_update.
Variant::List list;
list.push_back(Variant(id, "binary"));
sendState = Variant(list);
Variant accepted = Variant::described(qpid::amqp::message::ACCEPTED_CODE, Variant::List());
list.push_back(accepted);
acceptState = Variant(list);
}
}
types::Variant Transaction::getSendState() const {
error.raise();
return sendState;
}
void Transaction::acknowledge(pn_delivery_t* delivery)
{
error.raise();
PnData data(pn_disposition_data(pn_delivery_local(delivery)));
data.put(acceptState);
pn_delivery_update(delivery, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
pn_delivery_settle(delivery);
}
}}} // namespace qpid::messaging::amqp