blob: b57ae27ba6bf3e66cdd41fb224762fc99879a54f [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "SessionContext.h"
#include "SenderContext.h"
#include "ReceiverContext.h"
#include "Transaction.h"
#include "PnData.h"
#include <boost/format.hpp>
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp/descriptors.h"
extern "C" {
#include <proton/engine.h>
}
namespace qpid {
namespace messaging {
namespace amqp {
SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
SessionContext::~SessionContext()
{
// Clear all pointers to senders and receivers before we free the session.
senders.clear();
receivers.clear();
transaction.reset(); // Transaction is a sender.
if (!error && session)
pn_session_free(session);
}
boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, const SenderOptions& options)
{
error.raise();
std::string name = AddressHelper::getLinkName(address);
if (senders.find(name) != senders.end())
throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<SenderContext> s(
new SenderContext(session, name, address, options, transaction));
senders[name] = s;
return s;
}
boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
{
error.raise();
std::string name = AddressHelper::getLinkName(address);
if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection");
boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
receivers[name] = r;
return r;
}
boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
{
error.raise();
SenderMap::const_iterator i = senders.find(name);
if (i == senders.end()) {
throw qpid::messaging::KeyError(std::string("No such sender") + name);
} else {
return i->second;
}
}
boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
{
error.raise();
ReceiverMap::const_iterator i = receivers.find(name);
if (i == receivers.end()) {
throw qpid::messaging::KeyError(std::string("No such receiver") + name);
} else {
return i->second;
}
}
void SessionContext::removeReceiver(const std::string& n)
{
error.raise();
SessionContext::ReceiverMap::iterator i = receivers.find(n);
if (i != receivers.end()) {
i->second->cleanup();
receivers.erase(i);
}
}
void SessionContext::removeSender(const std::string& n)
{
error.raise();
SessionContext::SenderMap::iterator i = senders.find(n);
if (i != senders.end()) {
i->second->cleanup();
senders.erase(i);
}
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
{
error.raise();
for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
if (i->second->hasCurrent()) {
return i->second;
}
}
return boost::shared_ptr<ReceiverContext>();
}
uint32_t SessionContext::getReceivable()
{
error.raise();
return 0;//TODO
}
uint32_t SessionContext::getUnsettledAcks()
{
error.raise();
return 0;//TODO
}
qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
{
error.raise();
qpid::framing::SequenceNumber id = next++;
if (!pn_delivery_settled(delivery)) {
unacked[id] = delivery;
QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
pn_link_advance(pn_delivery_link(delivery));
} else {
pn_delivery_settle(delivery); // Automatically advances the link.
}
return id;
}
void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
{
error.raise();
for (DeliveryMap::iterator i = begin; i != end; ++i) {
types::Variant txState;
if (transaction) {
QPID_LOG(trace, "Setting disposition for transactional delivery "
<< i->first << " -> " << i->second);
transaction->acknowledge(i->second);
} else {
QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second);
pn_delivery_update(i->second, PN_ACCEPTED);
pn_delivery_settle(i->second); //TODO: different settlement modes?
}
}
unacked.erase(begin, end);
}
void SessionContext::acknowledge()
{
error.raise();
QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
acknowledge(unacked.begin(), unacked.end());
}
void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
{
error.raise();
QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative);
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
DeliveryMap::iterator start = cumulative ? unacked.begin() : i;
acknowledge(start, ++i);
} else {
QPID_LOG(debug, "selective acknowledgement failed; message not found for id " << id);
}
}
void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
{
error.raise();
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
if (reject) {
QPID_LOG(debug, "rejecting message with id=" << id);
pn_delivery_update(i->second, PN_REJECTED);
} else {
QPID_LOG(debug, "releasing message with id=" << id);
pn_delivery_update(i->second, PN_MODIFIED);
pn_disposition_set_failed(pn_delivery_local(i->second), true);
}
pn_delivery_settle(i->second);
unacked.erase(i);
}
}
bool SessionContext::settled()
{
error.raise();
bool result = true;
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
try {
if (!i->second->closed() && !i->second->settled()) result = false;
} catch (const MessageRejected&) {
throw;
} catch (const MessageReleased&) {
throw;
} catch (const std::exception&) {
senders.erase(i);
throw;
}
}
return result;
}
void SessionContext::setName(const std::string& n)
{
name = n;
}
std::string SessionContext::getName() const
{
return name;
}
void SessionContext::reset(pn_connection_t* connection)
{
unacked.clear();
if (transaction) {
if (transaction->isCommitting())
error = new TransactionUnknown("Transaction outcome unknown: transport failure");
else
error = new TransactionAborted("Transaction aborted: transport failure");
resetSession(0);
senders.clear();
receivers.clear();
transaction.reset();
return;
}
resetSession(pn_session(connection));
}
void SessionContext::resetSession(pn_session_t* session_) {
session = session_;
if (transaction) transaction->reset(session);
for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
i->second->reset(session);
}
for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
i->second->reset(session);
}
}
void SessionContext::cleanup() {
if (transaction) {
transaction->cleanup();
transaction = boost::shared_ptr<Transaction>();
}
for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
i->second->cleanup();
}
senders.clear();
for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
i->second->cleanup();
}
receivers.clear();
if (!error && session) {
error = new SessionClosed();
pn_session_free(session);
session = 0;
}
}
}}} // namespace qpid::messaging::amqp