blob: 0c7c8901ff13a0ab94047e9c2d719db345bb7cee [file] [log] [blame]
#ifndef QPID_BROKER_SEMANTICSTATE_H
#define QPID_BROKER_SEMANTICSTATE_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/NameGenerator.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/broker/AclModule.h"
#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
#include <map>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
namespace qpid {
namespace broker {
class SessionContext;
/**
*
* SemanticState implements the behavior of a Session, especially the
* state of consumers subscribed to queues. The code for ConsumerImpl
* is also in SemanticState.cpp
*
* SemanticState holds the AMQP Execution and Model state of an open
* session, whether attached to a channel or suspended. It is not
* dependent on any specific AMQP version.
*
* Message delivery is driven by ConsumerImpl::doOutput(), which is
* called when a client's socket is ready to write data.
*
*/
class QPID_BROKER_CLASS_EXTERN SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>,
public management::Manageable
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
bool blocked;
bool windowing;
bool exclusive;
std::string resumeId;
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
const int syncFrequency;
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription* mgmtObject;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
bool haveCredit();
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
void disableNotify();
void enableNotify();
void notify();
bool isNotifyEnabled() const;
void requestDispatch();
void setWindowMode();
void setCreditMode();
void addByteCredit(uint32_t value);
void addMessageCredit(uint32_t value);
void flush();
void stop();
void complete(DeliveryRecord&);
boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
bool doOutput();
std::string getName() const { return name; }
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
bool isExclusive() const { return exclusive; }
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
DeliveryAdapter& deliveryAdapter;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
DeliveryRecords unacked;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
AclModule* acl;
const bool authMsg;
const std::string userID;
const std::string userName;
const bool isDefaultRealm;
bool closeComplete;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
void unsubscribe(ConsumerImpl::shared_ptr);
void disable(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
/**
* Get named queue, never returns 0.
* @return: named queue
* @exception: ChannelException if no queue of that name is found.
* @exception: ConnectionException if name="" and session has no default.
*/
boost::shared_ptr<Queue> getQueue(const std::string& name) const;
bool exists(const std::string& consumerTag);
void consume(const std::string& destination,
boost::shared_ptr<Queue> queue,
bool ackRequired, bool acquire, bool exclusive,
const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
const framing::FieldTable& = framing::FieldTable());
bool cancel(const std::string& tag);
void setWindowMode(const std::string& destination);
void setCreditMode(const std::string& destination);
void addByteCredit(const std::string& destination, uint32_t value);
void addMessageCredit(const std::string& destination, uint32_t value);
void flush(const std::string& destination);
void stop(const std::string& destination);
void startTx();
void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void recover(bool requeue);
void deliver(DeliveryRecord& message, bool sync);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
void completed(const framing::SequenceSet& commands);
void accepted(const framing::SequenceSet& commands);
void attached();
void detached();
void closed();
// Used by cluster to re-create sessions
template <class F> void eachConsumer(F f) {
for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i)
f(i->second);
}
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
};
}} // namespace qpid::broker
#endif /*!QPID_BROKER_SEMANTICSTATE_H*/