blob: 9bdc658bc7ae87665d54df7849fcd3d3c8526a9b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "SessionContext.h"
#include "SenderContext.h"
#include "ReceiverContext.h"
#include <boost/format.hpp>
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
extern "C" {
#include <proton/engine.h>
}
namespace qpid {
namespace messaging {
namespace amqp {
SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
SessionContext::~SessionContext()
{
senders.clear(); receivers.clear();
pn_session_free(session);
}
boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address)
{
std::string name = address.getName();
int count = 1;
for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) {
name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
}
boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
senders[name] = s;
return s;
}
boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
{
std::string name = address.getName();
int count = 1;
for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) {
name = (boost::format("%1%_%2%") % address.getName() % ++count).str();
}
boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
receivers[name] = r;
return r;
}
boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
{
SenderMap::const_iterator i = senders.find(name);
if (i == senders.end()) {
throw qpid::messaging::KeyError(std::string("No such sender") + name);
} else {
return i->second;
}
}
boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
{
ReceiverMap::const_iterator i = receivers.find(name);
if (i == receivers.end()) {
throw qpid::messaging::KeyError(std::string("No such receiver") + name);
} else {
return i->second;
}
}
void SessionContext::closeReceiver(const std::string&)
{
}
void SessionContext::closeSender(const std::string&)
{
}
boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/)
{
return boost::shared_ptr<ReceiverContext>();
}
uint32_t SessionContext::getReceivable()
{
return 0;//TODO
}
uint32_t SessionContext::getUnsettledAcks()
{
return 0;//TODO
}
qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
{
qpid::framing::SequenceNumber id = next++;
unacked[id] = delivery;
QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
return id;
}
void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
{
for (DeliveryMap::iterator i = begin; i != end; ++i) {
QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
pn_delivery_update(i->second, PN_ACCEPTED);
pn_delivery_settle(i->second);//TODO: different settlement modes?
}
unacked.erase(begin, end);
}
void SessionContext::acknowledge()
{
QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
acknowledge(unacked.begin(), unacked.end());
}
void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
{
DeliveryMap::iterator i = unacked.find(id);
if (i != unacked.end()) {
acknowledge(cumulative ? unacked.begin() : i, ++i);
}
}
bool SessionContext::settled()
{
bool result = true;
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
if (!i->second->settled()) result = false;
}
return result;
}
}}} // namespace qpid::messaging::amqp