blob: e7d2259c80c98250be2ed908a225ef35ccf54e0e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <assert.h>
namespace qpid {
namespace broker {
using namespace std;
using boost::intrusive_ptr;
using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::ptr_map_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
closeComplete(false)
{}
SemanticState::~SemanticState() {
closed();
}
void SemanticState::closed() {
if (!closeComplete) {
//prevent requeued messages being redelivered to consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
disable(i->second);
}
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
recover(true);
//now unsubscribe, which may trigger queue deletion and thus
//needs to occur after the requeueing of unacked messages
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
cancel(i->second);
}
closeComplete = true;
}
}
bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
namespace {
const std::string SEPARATOR("::");
}
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl,
const FieldTable& arguments)
{
// "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
// Create a globally unique name so the broker can identify individual consumers
std::string name = session.getSessionId().str() + SEPARATOR + tag;
const ConsumerFactories::Factories& cf(
session.getBroker().getConsumerFactories().get());
ConsumerImpl::shared_ptr c;
for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end() && !c; ++i)
c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
c = ConsumerImpl::shared_ptr(
new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
bool SemanticState::cancel(const string& tag)
{
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
//can also remove any records that are now redundant
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
return true;
} else {
return false;
}
}
void SemanticState::startTx()
{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
} else {
throw InternalErrorException(QPID_MSG("Commit failed"));
}
}
void SemanticState::rollback()
{
if (!txBuffer)
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
txBuffer->rollback();
accumulatedAck.clear();
}
void SemanticState::selectDtx()
{
dtxSelected = true;
}
void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
dtxBuffer.reset(new DtxBuffer(xid));
txBuffer = dtxBuffer;
if (join) {
mgr.join(xid, dtxBuffer);
} else {
mgr.start(xid, dtxBuffer);
}
}
void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
throw IllegalStateException(QPID_MSG("xid " << xid << " not associated with this session"));
}
if (dtxBuffer->getXid() != xid) {
throw CommandInvalidException(
QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
}
txBuffer.reset();//ops on this session no longer transactional
checkDtxTimeout();
if (fail) {
dtxBuffer->fail();
} else {
dtxBuffer->markEnded();
}
dtxBuffer.reset();
}
void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
throw CommandInvalidException(
QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
}
txBuffer.reset();//ops on this session no longer transactional
checkDtxTimeout();
dtxBuffer->setSuspended(true);
suspendedXids[xid] = dtxBuffer;
dtxBuffer.reset();
}
void SemanticState::resumeDtx(const std::string& xid)
{
if (!dtxSelected) {
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
dtxBuffer = suspendedXids[xid];
if (!dtxBuffer) {
throw CommandInvalidException(QPID_MSG("xid " << xid << " not attached"));
} else {
suspendedXids.erase(xid);
}
if (dtxBuffer->getXid() != xid) {
throw CommandInvalidException(
QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
}
if (!dtxBuffer->isSuspended()) {
throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
}
checkDtxTimeout();
dtxBuffer->setSuspended(false);
txBuffer = dtxBuffer;
}
void SemanticState::checkDtxTimeout()
{
if (dtxBuffer->isExpired()) {
dtxBuffer.reset();
throw DtxTimeoutException();
}
}
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) :
Consumer(_name, _acquire),
parent(_parent),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
blocked(true),
exclusive(_exclusive),
resumeId(_resumeId),
tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
deliveryCount(0),
mgmtObject(0)
{
if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
if (agent != 0)
{
mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
}
}
}
ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
}
Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
return status;
}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
return &(parent->session);
}
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
msg.queue->dequeue(0, msg);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
{
return true;
}
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
//
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
namespace {
struct ConsumerName {
const SemanticState::ConsumerImpl& consumer;
ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
return o << pc.consumer.getTag() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
assertClusterSafe();
Credit original = credit;
credit.consume(1, msg->getRequiredCredit());
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
bool enoughCredit = credit.check(1, msg->getRequiredCredit());
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
<< " credit for message of " << msg->getRequiredCredit() << " bytes: "
<< credit);
return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
void SemanticState::disable(ConsumerImpl::shared_ptr c)
{
c->disableNotify();
if (session.isAttached())
session.getConnection().outputTasks.removeOutputTask(c.get());
}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
disable(c);
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
Queue::tryAutoDelete(session.getBroker(), queue);
}
}
c->cancel();
}
void SemanticState::handle(intrusive_ptr<Message> msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
route(msg, *deliverable);
txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
route(msg, deliverable);
if (msg->isContentReleaseRequested()) {
// NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
// presence of these messages). Do not change these without also checking these tests.
if (msg->isContentReleaseBlocked()) {
QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
} else {
msg->releaseContent();
QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
std::hex << msg->getPersistenceId() << std::dec << ": Content released");
}
}
}
}
namespace
{
const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName
|| cacheExchange->isDestroyed())
{
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
}
AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
exchangeName << " with routing-key " << msg->getRoutingKey()));
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
//TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
}
if (!strategy.delivered) {
msg->destroy();
}
}
}
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++)
i->second->requestDispatch();
}
void SemanticState::ConsumerImpl::requestDispatch()
{
assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
blocked = false;
}
}
bool SemanticState::complete(DeliveryRecord& delivery)
{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
}
return delivery.isRedundant();
}
void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
if (credit.isWindowMode()) {
credit.moveWindow(1, delivery.getCredit());
}
}
}
void SemanticState::recover(bool requeue)
{
if(requeue){
//take copy and clear unacked as requeue may result in redelivery to this session
//which will in turn result in additions to unacked
DeliveryRecords copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
{
return deliveryAdapter.deliver(msg, sync);
}
const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
ConsumerImpl::shared_ptr consumer;
if (!find(destination, consumer)) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
return consumer;
}
}
bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
{
// @todo KAG gsim: shouldn't the consumers map be locked????
ConsumerImplMap::const_iterator i = consumers.find(destination);
if (i == consumers.end()) {
return false;
}
consumer = i->second;
return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
ConsumerImpl::shared_ptr c = find(destination);
c->addByteCredit(value);
c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
ConsumerImpl::shared_ptr c = find(destination);
c->addMessageCredit(value);
c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
{
assertClusterSafe();
credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
}
}
void SemanticState::ConsumerImpl::setCreditMode()
{
assertClusterSafe();
credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
}
}
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
credit.addByteCredit(value);
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
credit.addMessageCredit(value);
}
bool SemanticState::ConsumerImpl::haveCredit()
{
if (credit) {
return true;
} else {
blocked = true;
return false;
}
}
bool SemanticState::ConsumerImpl::doDispatch()
{
return queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::flush()
{
while(haveCredit() && doDispatch())
;
credit.cancel();
}
void SemanticState::ConsumerImpl::stop()
{
assertClusterSafe();
credit.cancel();
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
Queue::shared_ptr queue;
if (name.empty()) {
throw NotAllowedException(QPID_MSG("No queue name specified."));
} else {
queue = session.getBroker().getQueues().find(name);
if (!queue)
throw NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
{
return DeliveryRecord::findRange(unacked, first, last);
}
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
{
AckRange range = findRange(first, last);
for_each(range.start, range.end, AcquireFunctor(acquired));
}
void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered)
{
AckRange range = findRange(first, last);
//release results in the message being added to the head so want
//to release in reverse order to keep the original transfer order
DeliveryRecords::reverse_iterator start(range.end);
DeliveryRecords::reverse_iterator end(range.start);
for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered));
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, range.end);
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
//may need to remove the delivery records as well
for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) {
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
}
bool SemanticState::ConsumerImpl::doOutput()
{
try {
return haveCredit() && doDispatch();
} catch (const SessionException& e) {
throw SessionOutputException(e, parent->session.getChannel());
}
}
void SemanticState::ConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
assertClusterSafe();
notifyEnabled = true;
}
void SemanticState::ConsumerImpl::disableNotify()
{
Mutex::ScopedLock l(lock);
notifyEnabled = false;
}
bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
Mutex::ScopedLock l(lock);
return notifyEnabled;
}
void SemanticState::ConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
}
}
// Test that a DeliveryRecord's ID is in a sequence set and some other
// predicate on DeliveryRecord holds.
template <class Predicate> struct IsInSequenceSetAnd {
IsInSequenceSet isInSet;
Predicate predicate;
IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
bool operator()(DeliveryRecord& dr) {
return isInSet(dr.getId()) && predicate(dr);
}
};
template<class Predicate> IsInSequenceSetAnd<Predicate>
isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
return IsInSequenceSetAnd<Predicate>(s,p);
}
void SemanticState::accepted(const SequenceSet& commands) {
assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
accumulatedAck.add(commands);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
bind(&DeliveryRecord::setEnded, _1)));
unacked.erase(removed, unacked.end());
}
} else {
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
bind(&DeliveryRecord::accept, _1,
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
}
void SemanticState::completed(const SequenceSet& commands) {
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
bind(&SemanticState::complete, this, _1)));
unacked.erase(removed, unacked.end());
requestDispatch();
}
void SemanticState::attached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
session.getConnection().outputTasks.addOutputTask(i->second.get());
}
session.getConnection().outputTasks.activateOutput();
}
void SemanticState::detached()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
session.getConnection().outputTasks.removeOutputTask(i->second.get());
}
}
}} // namespace qpid::broker