blob: b1f18747f33c96bbf63250af751e619165c01e90 [file]
#ifndef QPID_BROKER_SESSION_H
#define QPID_BROKER_SESSION_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/SessionState.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <queue>
#include <set>
#include <vector>
#include <ostream>
namespace qpid {
namespace framing {
class AMQP_ClientProxy;
}
namespace sys {
class TimerTask;
}
namespace broker {
class Broker;
class ConnectionState;
class SessionHandler;
class SessionManager;
/**
* Broker-side session state includes session's handler chains, which
* may themselves have state.
*/
class SessionState : public qpid::SessionState,
public SessionContext,
public management::Manageable,
public framing::FrameHandler::InOutHandler
{
public:
SessionState(Broker&, SessionHandler&, const SessionId&,
const SessionState::Configuration&);
~SessionState();
bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
void disableOutput();
SessionHandler* getHandler() { return handler; }
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
/** @pre isAttached() */
uint16_t getChannel() const;
/** @pre isAttached() */
amqp_0_10::Connection& getConnection();
bool isLocal(const OwnershipToken* t) const;
Broker& getBroker();
void setTimeout(uint32_t seconds);
void senderCompleted(const framing::SequenceSet& ranges);
void sendCompletion();
DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
const std::string& destination, bool isRedelivered, uint64_t ttl,
qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void readyToSend();
const SessionId& getSessionId() const { return getId(); }
/**
* Used by ExecutionHandler sync command processing. Notifies
* the SessionState of a received Execution.Sync command.
* Return true if there are incomplete commands before the execution sync.
*/
bool addPendingExecutionSync();
/**
* Mark commannd ID as an execution sync point, completions will be sent
* when all commands up to that point are completed.
*/
bool addPendingExecutionSync(SequenceNumber id);
void setUnackedCount(uint64_t count) {
if (mgmtObject)
mgmtObject->set_unackedMessages(count);
}
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
// transaction-related methods just to update statistics
void startTx();
void commitTx();
void rollbackTx();
/** Send result and completion for a given command to the client. */
void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
const std::string& result);
MessageBuilder& getMessageBuilder() { return msgBuilder; }
private:
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
// End of the input & output chains.
void handleInLast(framing::AMQFrame& frame);
void handleOutLast(framing::AMQFrame& frame);
void sendAcceptAndCompletion();
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
qmf::org::apache::qpid::broker::Session::shared_ptr mgmtObject;
qpid::framing::SequenceSet accepted;
// sequence numbers for pending received Execution.Sync commands
std::queue<SequenceNumber> pendingExecutionSyncs;
public:
/** Information about the currently executing command.
* Can only be used in the IO thread during command execution.
*/
class CurrentCommand {
public:
CurrentCommand(
SequenceNumber id_=0, bool syncRequired_=false, bool completeSync_=true ) :
id(id_), syncRequired(syncRequired_), completeSync(completeSync_)
{}
SequenceNumber getId() const { return id; }
/**@return true if the sync flag was set for the command. */
bool isSyncRequired() const { return syncRequired; }
/**@return true if the command should be completed synchronously
* in the handling thread.
*/
bool isCompleteSync() const { return completeSync; }
void setCompleteSync(bool b) { completeSync = b; }
private:
SequenceNumber id; ///< Command identifier.
bool syncRequired; ///< True if sync flag set for the command.
bool completeSync; ///< Will be completed by handCommand.
};
CurrentCommand& getCurrentCommand() { return currentCommand; }
/** This class provides a context for completing asynchronous commands in a thread
* safe manner. Asynchronous commands save their completion state in this class.
* This class then schedules the completeCommands() method in the IO thread.
* While running in the IO thread, completeCommands() may safely complete all
* saved commands without the risk of colliding with other operations on this
* SessionState.
*/
class AsyncCommandCompleter : public RefCounted {
private:
SessionState *session;
bool isAttached;
qpid::sys::Mutex completerLock;
struct CommandInfo {
SequenceNumber cmd; // message.transfer command id
bool requiresAccept;
bool requiresSync;
CommandInfo(
SequenceNumber c, bool a, bool s)
: cmd(c), requiresAccept(a), requiresSync(s) {}
};
std::vector<CommandInfo> completedCmds;
// If an ingress message does not require a Sync, we need to
// hold a reference to it in case an Execution.Sync command is received and we
// have to manually flush the message.
std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > pendingMsgs;
/** complete all pending commands, runs in IO thread */
void completeCommands();
public:
AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
~AsyncCommandCompleter() {};
/** track a message pending ingress completion */
void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
void deletePendingMessage(SequenceNumber id);
void flushPendingMessages();
/** schedule the processing of command completion. */
void scheduleCommandCompletion(SequenceNumber cmd,
bool requiresAccept,
bool requiresSync);
void schedule(boost::function<void()>);
void cancel(); // called by SessionState destructor.
void attached(); // called by SessionState on attach()
void detached(); // called by SessionState on detach()
SessionState* getSession() const { return session; }
};
boost::intrusive_ptr<AsyncCommandCompleter> getAsyncCommandCompleter() {
return asyncCommandCompleter;
}
/** Abstract class that represents a single asynchronous command that is
* pending completion.
*/
class AsyncCommandContext : public AsyncCompletion::Callback
{
public:
AsyncCommandContext(SessionState& ss )
: id(ss.getCurrentCommand().getId()),
requiresSync(ss.getCurrentCommand().isSyncRequired()),
completerContext(ss.getAsyncCommandCompleter())
{}
virtual ~AsyncCommandContext() {}
protected:
SequenceNumber id;
bool requiresSync;
boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
};
private:
boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
CurrentCommand currentCommand;
/** incomplete Message.transfer commands - inbound to broker from client
*/
class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
: AsyncCommandContext(*ss),
session(ss),
msg(m),
requiresAccept(m->requiresAccept()),
requiresSync(m->getFrames().getMethod()->isSync()),
pending(false)
{
assert(id == m->getCommandId());
}
virtual ~IncompleteIngressMsgXfer() {}
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
SessionState *session; // only valid if sync flag in callback is true
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg;
bool requiresAccept;
bool requiresSync;
bool pending; // true if msg saved on pending list...
};
friend class SessionManager;
};
inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
return out << session.getId();
}
}} // namespace qpid::broker
#endif /*!QPID_BROKER_SESSION_H*/