blob: d71134548a03c39eee0164ccb73980cb1b39ee45 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
using namespace framing;
using sys::Mutex;
using boost::intrusive_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::AbsTime;
//using qpid::sys::Timer;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
Broker& b, SessionHandler& h, const SessionId& id,
const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this),
adapter(semanticState),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
addManagementObject();
attach(h);
}
void SessionState::addManagementObject() {
if (GetManagementObject()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session
(agent, this, parent, getId().getName()));
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
agent->addObject(mgmtObject);
}
}
}
SessionState::~SessionState() {
asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
AMQP_ClientProxy& SessionState::getProxy() {
assert(isAttached());
return handler->getProxy();
}
uint16_t SessionState::getChannel() const {
assert(isAttached());
return handler->getChannel();
}
ConnectionState& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
}
bool SessionState::isLocal(const ConnectionToken* t) const
{
return isAttached() && &(handler->getConnection()) == t;
}
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
asyncCommandCompleter->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
mgmtObject->set_attached (0);
}
void SessionState::disableOutput()
{
semanticState.detached(); //prevents further activateOutput calls until reattached
}
void SessionState::attach(SessionHandler& h) {
QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
if (mgmtObject != 0)
{
mgmtObject->set_attached (1);
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
asyncCommandCompleter->attached();
}
void SessionState::abort() {
if (isAttached())
getConnection().outputTasks.abort();
}
void SessionState::activateOutput() {
if (isAttached())
getConnection().outputTasks.activateOutput();
}
ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
{
return mgmtObject;
}
Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
Args& /*args*/,
std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
switch (methodId)
{
case _qmf::Session::METHOD_DETACH :
if (handler != 0) {
handler->sendDetach();
}
status = Manageable::STATUS_OK;
break;
case _qmf::Session::METHOD_CLOSE :
/*
if (handler != 0)
{
handler->getConnection().closeChannel(handler->getChannel());
}
status = Manageable::STATUS_OK;
break;
*/
case _qmf::Session::METHOD_SOLICITACK :
case _qmf::Session::METHOD_RESETLIFESPAN :
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
return status;
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
if (currentCommandComplete) receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync() && currentCommandComplete) {
sendAcceptAndCompletion();
}
}
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
//i.e this is a just a command frame, add a dummy header
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
msg->getFrames().append(header);
}
DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
deliverable.getMessage().setTimestamp();
deliverable.getMessage().setPublisher(&getConnection());
IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().begin();
semanticState.route(deliverable.getMessage(), deliverable);
msgBuilder.end();
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
}
void SessionState::sendAcceptAndCompletion()
{
if (!accepted.empty()) {
getProxy().getMessage().accept(accepted);
accepted.clear();
}
sendCompletion();
}
/** Invoked when the given inbound message is finished being processed
* by all interested parties (eg. it is done being enqueued to all queues,
* its credit has been accounted for, etc). At this point, msg is considered
* by this receiver as 'completed' (as defined by AMQP 0_10)
*/
void SessionState::completeRcvMsg(SequenceNumber id,
bool requiresAccept,
bool requiresSync)
{
bool callSendCompletion = false;
receiverCompleted(id);
if (requiresAccept)
// will cause msg's seq to appear in the next message.accept we send.
accepted.add(id);
// Are there any outstanding Execution.Sync commands pending the
// completion of this msg? If so, complete them.
while (!pendingExecutionSyncs.empty() &&
receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
const SequenceNumber id = pendingExecutionSyncs.front();
pendingExecutionSyncs.pop();
QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
receiverCompleted(id);
callSendCompletion = true; // likely peer is pending for this completion.
}
// if the sender has requested immediate notification of the completion...
if (requiresSync || callSendCompletion) {
sendAcceptAndCompletion();
}
}
void SessionState::handleIn(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
//TODO: make command handling more uniform, regardless of whether
//commands carry content.
AMQMethodBody* m = frame.getMethod();
if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
} else if (frame.getBof() && frame.getEof()) {
handleCommand(frame.getMethod(), commandId);
} else {
throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
}
void SessionState::handleOut(AMQFrame& frame) {
assert(handler);
handler->out(frame);
}
DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
const qpid::types::Variant::Map& annotations, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
method.setEof(false);
getProxy().getHandler().handle(method);
message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
message.sendContent(getProxy().getHandler(), maxFrameSize);
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
if (sync) {
AMQP_ClientProxy::Execution& p(getProxy().getExecution());
Proxy::ScopedSync s(p);
p.sync();
}
return commandId;
}
void SessionState::sendCompletion() {
handler->sendCompletion();
}
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
semanticState.completed(commands);
}
void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
semanticState.attached();
}
Broker& SessionState::getBroker() { return broker; }
// Session resume is not fully implemented so it is useless to set a
// non-0 timeout.
void SessionState::setTimeout(uint32_t) { }
// Current received command is an execution.sync command.
// Complete this command only when all preceding commands have completed.
// (called via the invoker() in handleCommand() above)
void SessionState::addPendingExecutionSync()
{
SequenceNumber syncCommandId = receiverGetCurrent();
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
asyncCommandCompleter->flushPendingMessages();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
boost::intrusive_ptr<AsyncCompletion::Callback>
SessionState::IncompleteIngressMsgXfer::clone()
{
// Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
// If the client is pending the message.transfer completion, flush now to force immediate write to journal.
if (requiresSync)
msg->flush();
else {
// otherwise, we need to track this message in order to flush it if an execution.sync arrives
// before it has been completed (see flushPendingMessages())
pending = true;
completerContext->addPendingMessage(msg);
}
return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
}
/** Invoked by the asynchronous completer associated with a received
* msg that is pending Completion. May be invoked by the IO thread
* (sync == true), or some external thread (!sync).
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
* the scheduledCompleterContext, since *this has a shared pointer to it.
* but not session!
*/
session = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
} else {
// this path runs directly from the ac->end() call in handleContent() above,
// so *session is definitely valid.
if (session->isAttached()) {
QPID_LOG(debug, ": receive completed for msg seq=" << id);
session->completeRcvMsg(id, requiresAccept, requiresSync);
}
}
completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
}
/** Scheduled from an asynchronous command's completed callback to run on
* the IO thread.
*/
void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
{
ctxt->completeCommands();
}
/** Track an ingress message that is pending completion */
void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
bool unique = pendingMsgs.insert(item).second;
if (!unique) {
assert(false);
}
}
/** pending message has completed */
void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingMsgs.erase(id);
}
/** done when an execution.sync arrives */
void SessionState::AsyncCommandCompleter::flushPendingMessages()
{
std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
}
// drop lock, so it is safe to call "flush()"
for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}
}
/** mark an ingress Message.Transfer command as completed.
* This method must be thread safe - it may run on any thread.
*/
void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
bool requiresAccept,
bool requiresSync)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
if (session && isAttached) {
MessageInfo msg(cmd, requiresAccept, requiresSync);
completedMsgs.push_back(msg);
if (completedMsgs.size() == 1) {
session->getConnection().requestIOProcessing(boost::bind(&schedule,
session->asyncCommandCompleter));
}
}
}
/** Cause the session to complete all completed commands.
* Executes on the IO thread.
*/
void SessionState::AsyncCommandCompleter::completeCommands()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
// when session is destroyed, it clears the session pointer via cancel().
if (session && session->isAttached()) {
for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
msg != completedMsgs.end(); ++msg) {
session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
}
}
completedMsgs.clear();
}
/** cancel any pending calls to scheduleComplete */
void SessionState::AsyncCommandCompleter::cancel()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
session = 0;
}
/** inform the completer that the session has attached,
* allows command completion scheduling from any thread */
void SessionState::AsyncCommandCompleter::attached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = true;
}
/** inform the completer that the session has detached,
* disables command completion scheduling from any thread */
void SessionState::AsyncCommandCompleter::detached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = false;
}
}} // namespace qpid::broker