blob: 506af85c475e5ce3e71a64ff08295109230fe9d8 [file] [log] [blame]
#ifndef QPID_BROKER_SESSION_H
#define QPID_BROKER_SESSION_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <set>
#include <vector>
#include <ostream>
namespace qpid {
namespace framing {
class AMQP_ClientProxy;
}
namespace sys {
class TimerTask;
}
namespace broker {
class Broker;
class ConnectionState;
class Message;
class SessionHandler;
class SessionManager;
class RateFlowcontrol;
/**
* Broker-side session state includes session's handler chains, which
* may themselves have state.
*/
class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
public management::Manageable,
public framing::FrameHandler::InOutHandler
{
public:
SessionState(Broker&, SessionHandler&, const SessionId&,
const SessionState::Configuration&, bool delayManagement=false);
~SessionState();
bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
void disableOutput();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
uint16_t getChannel() const;
/** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
Broker& getBroker();
void setTimeout(uint32_t seconds);
/** OutputControl **/
void abort();
void activateOutput();
void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
void sendCompletion();
//delivery adapter methods:
void deliver(DeliveryRecord&, bool sync);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void readyToSend();
// Used by cluster to create replica sessions.
SemanticState& getSemanticState() { return semanticState; }
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
SessionAdapter& getSessionAdapter() { return adapter; }
bool processSendCredit(uint32_t msgs);
const SessionId& getSessionId() const { return getId(); }
// Used by ExecutionHandler sync command processing. Notifies
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
// indicate that the given ingress msg has been completely received by the
// broker, and the msg's message.transfer command can be considered completed.
void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
// End of the input & output chains.
void handleInLast(framing::AMQFrame& frame);
void handleOutLast(framing::AMQFrame& frame);
void sendAcceptAndCompletion();
/**
* If commands are sent based on the local time (e.g. in timers), they don't have
* a well-defined ordering across cluster nodes.
* This proxy is for sending such commands. In a clustered broker it will take steps
* to synchronize command order across the cluster. In a stand-alone broker
* it is just a synonym for getProxy()
*/
framing::AMQP_ClientProxy& getClusterOrderProxy();
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
// sequence numbers for pending received Execution.Sync commands
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
/** This class provides a context for completing asynchronous commands in a thread
* safe manner. Asynchronous commands save their completion state in this class.
* This class then schedules the completeCommands() method in the IO thread.
* While running in the IO thread, completeCommands() may safely complete all
* saved commands without the risk of colliding with other operations on this
* SessionState.
*/
class AsyncCommandCompleter : public RefCounted {
private:
SessionState *session;
bool isAttached;
qpid::sys::Mutex completerLock;
// special-case message.transfer commands for optimization
struct MessageInfo {
SequenceNumber cmd; // message.transfer command id
bool requiresAccept;
bool requiresSync;
MessageInfo(SequenceNumber c, bool a, bool s)
: cmd(c), requiresAccept(a), requiresSync(s) {}
};
std::vector<MessageInfo> completedMsgs;
// If an ingress message does not require a Sync, we need to
// hold a reference to it in case an Execution.Sync command is received and we
// have to manually flush the message.
std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
/** complete all pending commands, runs in IO thread */
void completeCommands();
/** for scheduling a run of "completeCommands()" on the IO thread */
static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
public:
AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandCompleter() {};
/** track a message pending ingress completion */
void addPendingMessage(boost::intrusive_ptr<Message> m);
void deletePendingMessage(SequenceNumber id);
void flushPendingMessages();
/** schedule the processing of a completed ingress message.transfer command */
void scheduleMsgCompletion(SequenceNumber cmd,
bool requiresAccept,
bool requiresSync);
void cancel(); // called by SessionState destructor.
void attached(); // called by SessionState on attach()
void detached(); // called by SessionState on detach()
};
boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
/** Abstract class that represents a single asynchronous command that is
* pending completion.
*/
class AsyncCommandContext : public AsyncCompletion::Callback
{
public:
AsyncCommandContext( SessionState *ss, SequenceNumber _id )
: id(_id), completerContext(ss->asyncCommandCompleter) {}
virtual ~AsyncCommandContext() {}
protected:
SequenceNumber id;
boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
};
/** incomplete Message.transfer commands - inbound to broker from client
*/
class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<Message> m )
: AsyncCommandContext(ss, m->getCommandId()),
session(ss),
msg(m),
requiresAccept(m->requiresAccept()),
requiresSync(m->getFrames().getMethod()->isSync()),
pending(false) {}
IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
: AsyncCommandContext(x.session, x.msg->getCommandId()),
session(x.session),
msg(x.msg),
requiresAccept(x.requiresAccept),
requiresSync(x.requiresSync),
pending(x.pending) {}
virtual ~IncompleteIngressMsgXfer() {};
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
SessionState *session; // only valid if sync flag in callback is true
boost::intrusive_ptr<Message> msg;
bool requiresAccept;
bool requiresSync;
bool pending; // true if msg saved on pending list...
};
friend class SessionManager;
};
inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
return out << session.getId();
}
}} // namespace qpid::broker
#endif /*!QPID_BROKER_SESSION_H*/