blob: 01e614e041201200c61e9ecbfa70c552d8cfc918 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/IntegerTypes.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
namespace { const std::string EMPTY; }
namespace qpid {
namespace client {
using namespace qpid::framing;
using namespace qpid::framing::session; //for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
typedef sys::ScopedLock<sys::Semaphore> Acquire;
SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl> conn)
: state(INACTIVE),
detachedLifetime(0),
maxFrameSize(conn->getNegotiatedSettings().maxFrameSize),
id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name),
connection(conn),
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
nextOut(0)
{
channel.next = connection.get();
}
SessionImpl::~SessionImpl() {
{
Lock l(state);
if (state != DETACHED && state != DETACHING) {
QPID_LOG(warning, "Session was not closed cleanly: " << id);
// Inform broker but don't wait for detached as that deadlocks.
// The detached will be ignored as the channel will be invalid.
try { detach(); } catch (...) {} // ignore errors.
setState(DETACHED);
handleClosed();
state.waitWaiters();
}
}
connection->erase(channel);
}
FrameSet::shared_ptr SessionImpl::get() // user thread
{
// No lock here: pop does a blocking wait.
return demux.getDefault()->pop();
}
const SessionId SessionImpl::getId() const //user thread
{
return id; //id is immutable
}
void SessionImpl::open(uint32_t timeout) // user thread
{
Lock l(state);
if (state == INACTIVE) {
setState(ATTACHING);
proxy.attach(id.getName(), false);
waitFor(ATTACHED);
//TODO: timeout will not be set locally until get response to
//confirm, should we wait for that?
setTimeout(timeout);
proxy.commandPoint(nextOut, 0);
} else {
throw Exception("Open already called for this session");
}
}
void SessionImpl::close() //user thread
{
Lock l(state);
// close() must be idempotent and no-throw as it will often be called in destructors.
if (state != DETACHED && state != DETACHING) {
try {
if (detachedLifetime) setTimeout(0);
detach();
waitFor(DETACHED);
} catch (...) {}
setState(DETACHED);
}
}
void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread
{
throw NotImplementedException("Resume not yet implemented by client!");
}
void SessionImpl::suspend() //user thread
{
Lock l(state);
detach();
}
void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
setState(DETACHING);
proxy.detach(id.getName());
}
}
uint16_t SessionImpl::getChannel() const // user thread
{
return channel;
}
void SessionImpl::setChannel(uint16_t c) // user thread
{
//channel will only ever be set when session is detached (and
//about to be resumed)
channel = c;
}
Demux& SessionImpl::getDemux()
{
return demux;
}
void SessionImpl::waitForCompletion(const SequenceNumber& id)
{
Lock l(state);
sys::Waitable::ScopedWait w(state);
waitForCompletionImpl(id);
}
void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lock held
{
while (incompleteOut.contains(id)) {
checkOpen();
state.wait();
}
}
bool SessionImpl::isComplete(const SequenceNumber& id)
{
Lock l(state);
return !incompleteOut.contains(id);
}
struct IsCompleteUpTo
{
const SequenceNumber& id;
bool result;
IsCompleteUpTo(const SequenceNumber& _id) : id(_id), result(true) {}
void operator()(const SequenceNumber& start, const SequenceNumber&)
{
if (start <= id) result = false;
}
};
bool SessionImpl::isCompleteUpTo(const SequenceNumber& id)
{
Lock l(state);
//return false if incompleteOut contains anything less than id,
//true otherwise
IsCompleteUpTo f(id);
incompleteIn.for_each(f);
return f.result;
}
framing::SequenceNumber SessionImpl::getCompleteUpTo()
{
SequenceNumber firstIncomplete;
{
Lock l(state);
firstIncomplete = incompleteIn.front();
}
return --firstIncomplete;
}
struct MarkCompleted
{
const SequenceNumber& id;
SequenceSet& completedIn;
MarkCompleted(const SequenceNumber& _id, SequenceSet& set) : id(_id), completedIn(set) {}
void operator()(const SequenceNumber& start, const SequenceNumber& end)
{
if (id >= end) {
completedIn.add(start, end);
} else if (id >= start) {
completedIn.add(start, id);
}
}
};
void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer)
{
Lock l(state);
incompleteIn.remove(ids);
completedIn.add(ids);
if (notifyPeer) {
sendCompletion();
}
}
void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
{
Lock l(state);
if (cumulative) {
//everything in incompleteIn less than or equal to id is now complete
MarkCompleted f(id, completedIn);
incompleteIn.for_each(f);
//make sure id itself is in
completedIn.add(id);
//then remove anything thats completed from the incomplete set
incompleteIn.remove(completedIn);
} else if (incompleteIn.contains(id)) {
incompleteIn.remove(id);
completedIn.add(id);
}
if (notifyPeer) {
sendCompletion();
}
}
void SessionImpl::setException(const sys::ExceptionHolder& ex) {
Lock l(state);
setExceptionLH(ex);
}
void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with lock held.
exceptionHolder = ex;
setState(DETACHED);
}
/**
* Called by ConnectionImpl to notify active sessions when connection
* is explictly closed
*/
void SessionImpl::connectionClosed(uint16_t code, const std::string& text) {
setException(createConnectionException(code, text));
handleClosed();
}
/**
* Called by ConnectionImpl to notify active sessions when connection
* is disconnected
*/
void SessionImpl::connectionBroke(const std::string& _text) {
setException(sys::ExceptionHolder(new TransportFailure(_text)));
handleClosed();
}
Future SessionImpl::send(const AMQBody& command)
{
return sendCommand(command);
}
Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
{
return sendCommand(command, &content);
}
namespace {
// Functor for FrameSet::map to send header + content frames but, not method frames.
struct SendContentFn {
FrameHandler& handler;
void operator()(const AMQFrame& f) {
if (!f.getMethod())
handler(const_cast<AMQFrame&>(f));
}
SendContentFn(FrameHandler& h) : handler(h) {}
};
}
Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
Acquire a(sendLock);
SequenceNumber id = nextOut++;
{
Lock l(state);
checkOpen();
incompleteOut.add(id);
}
Future f(id);
if (command.getMethod()->resultExpected()) {
Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
}
AMQFrame frame(command);
frame.setEof(false);
handleOut(frame);
SendContentFn send(out);
content.map(send);
return f;
}
void SessionImpl::sendRawFrame(AMQFrame& frame) {
Acquire a(sendLock);
handleOut(frame);
}
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
Acquire a(sendLock);
SequenceNumber id = nextOut++;
{
Lock l(state);
checkOpen();
incompleteOut.add(id);
}
Future f(id);
if (command.getMethod()->resultExpected()) {
Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
}
AMQFrame frame(command);
if (content) {
frame.setEof(false);
}
handleOut(frame);
if (content) {
sendContent(*content);
}
return f;
}
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
if(data_length < frag_size){
AMQFrame frame((AMQContentBody(content.getData())));
frame.setFirstSegment(false);
handleOut(frame);
}else{
uint32_t offset = 0;
uint32_t remaining = data_length - offset;
while (remaining > 0) {
uint32_t length = remaining > frag_size ? frag_size : remaining;
std::string frag(content.getData().substr(offset, length));
AMQFrame frame((AMQContentBody(frag)));
frame.setFirstSegment(false);
frame.setLastSegment(true);
if (offset > 0) {
frame.setFirstFrame(false);
}
offset += length;
remaining = data_length - offset;
if (remaining) {
frame.setLastFrame(false);
}
handleOut(frame);
}
}
} else {
handleOut(header);
}
}
bool isMessageMethod(AMQMethodBody* method)
{
return method->isA<MessageTransferBody>();
}
bool isMessageMethod(AMQBody* body)
{
AMQMethodBody* method=body->getMethod();
return method && isMessageMethod(method);
}
bool isContentFrame(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
uint8_t type = body->type();
return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
}
void SessionImpl::handleIn(AMQFrame& frame) // network thread
{
try {
if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
;
} else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
//make sure the command id sequence and completion
//tracking takes account of execution commands
Lock l(state);
completedIn.add(nextIn++);
} else {
//if not handled by this class, its for the application:
deliver(frame);
}
}
catch (const SessionException& e) {
setException(createSessionException(e.code, e.getMessage()));
}
catch (const ChannelException& e) {
setException(createChannelException(e.code, e.getMessage()));
}
}
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
sendFrame(frame, true);
}
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
//frames sent by application; session controls should not be
//blocked by bounds checking on the outgoing frame queue.
sendFrame(frame, false);
}
void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock)
{
connection->expand(frame.encodedSize(), canBlock);
channel.handle(frame);
}
void SessionImpl::deliver(AMQFrame& frame) // network thread
{
if (!arriving) {
arriving = FrameSet::shared_ptr(new FrameSet(nextIn++));
}
arriving->append(frame);
if (arriving->isComplete()) {
//message.transfers will be marked completed only when 'acked'
//as completion affects flow control; other commands will be
//considered completed as soon as processed here
if (arriving->isA<MessageTransferBody>()) {
Lock l(state);
incompleteIn.add(arriving->getId());
} else {
Lock l(state);
completedIn.add(arriving->getId());
}
demux.handle(arriving);
arriving.reset();
}
}
//control handler methods (called by network thread when controls are
//received from peer):
void SessionImpl::attach(const std::string& /*name*/, bool /*force*/)
{
throw NotImplementedException("Client does not support attach");
}
void SessionImpl::attached(const std::string& _name)
{
Lock l(state);
if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(ATTACHED);
}
void SessionImpl::detach(const std::string& _name)
{
Lock l(state);
if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
QPID_LOG(info, "Session detached by peer: " << id);
proxy.detached(_name, DETACH_CODE_NORMAL);
handleClosed();
}
void SessionImpl::detached(const std::string& _name, uint8_t _code) {
Lock l(state);
if (id.getName() != _name) throw InternalErrorException("Incorrect session name");
setState(DETACHED);
if (_code) {
//TODO: make sure this works with execution.exception - don't
//want to overwrite the code from that
setExceptionLH(createChannelException(_code, "Session detached by peer"));
QPID_LOG(error, exceptionHolder.what());
}
if (detachedLifetime == 0) {
handleClosed();
}
}
void SessionImpl::requestTimeout(uint32_t t)
{
Lock l(state);
detachedLifetime = t;
proxy.timeout(t);
}
void SessionImpl::timeout(uint32_t t)
{
Lock l(state);
detachedLifetime = t;
}
void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
{
if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
Lock l(state);
nextIn = id;
}
void SessionImpl::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
{
if (!commands.empty() || fragments.encodedSize()) {
throw NotImplementedException("Session resumption not yet supported");
}
}
void SessionImpl::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
{
//don't really care too much about this yet
}
void SessionImpl::completed(const framing::SequenceSet& commands, bool timelyReply)
{
Lock l(state);
incompleteOut.remove(commands);
state.notifyAll();//notify any waiters of completion
completedOut.add(commands);
//notify any waiting results of completion
results.completed(commands);
if (timelyReply) {
proxy.knownCompleted(completedOut);
completedOut.clear();
}
}
void SessionImpl::knownCompleted(const framing::SequenceSet& commands)
{
Lock l(state);
completedIn.remove(commands);
}
void SessionImpl::flush(bool expected, bool confirmed, bool completed)
{
Lock l(state);
if (expected) {
proxy.expected(SequenceSet(nextIn), Array());
}
if (confirmed) {
proxy.confirmed(completedIn, Array());
}
if (completed) {
proxy.completed(completedIn, true);
}
}
void SessionImpl::sendCompletion()
{
Lock l(state);
sendCompletionImpl();
}
void SessionImpl::sendFlush()
{
Lock l(state);
proxy.flush(false, false, true);
}
void SessionImpl::sendCompletionImpl()
{
proxy.completed(completedIn, completedIn.span() > 1000);
}
void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
{
throw NotImplementedException("gap not yet supported");
}
void SessionImpl::sync() {}
void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value)
{
Lock l(state);
results.received(commandId, value);
}
void SessionImpl::exception(uint16_t errorCode,
const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t /*fieldIndex*/,
const std::string& description,
const framing::FieldTable& /*errorInfo*/)
{
Lock l(state);
setExceptionLH(createSessionException(errorCode, description));
QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()
<< " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
if (detachedLifetime)
setTimeout(0);
}
//private utility methods:
inline void SessionImpl::setState(State s) //call with lock held
{
state = s;
}
inline void SessionImpl::waitFor(State s) //call with lock held
{
// We can be DETACHED at any time
if (s == DETACHED) state.waitFor(DETACHED);
else state.waitFor(States(s, DETACHED));
check();
}
void SessionImpl::check() const //call with lock held.
{
exceptionHolder.raise();
}
void SessionImpl::checkOpen() const //call with lock held.
{
check();
if (state != ATTACHED) {
throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached"));
}
}
void SessionImpl::assertOpen() const
{
Lock l(state);
checkOpen();
}
bool SessionImpl::hasError() const
{
Lock l(state);
return !exceptionHolder.empty();
}
void SessionImpl::handleClosed()
{
demux.close(exceptionHolder.empty() ?
sys::ExceptionHolder(new ClosedException()) : exceptionHolder);
results.close();
}
uint32_t SessionImpl::setTimeout(uint32_t seconds) {
proxy.requestTimeout(seconds);
// FIXME aconway 2008-10-07: wait for timeout response from broker
// and use value retured by broker.
detachedLifetime = seconds;
return detachedLifetime;
}
uint32_t SessionImpl::getTimeout() const {
return detachedLifetime;
}
boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection()
{
return connection;
}
}}