| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "qpid/client/SessionImpl.h" |
| |
| #include "qpid/client/ConnectionImpl.h" |
| #include "qpid/client/Future.h" |
| |
| #include "qpid/framing/all_method_bodies.h" |
| #include "qpid/framing/ClientInvoker.h" |
| #include "qpid/framing/enum.h" |
| #include "qpid/framing/FrameSet.h" |
| #include "qpid/framing/AMQFrame.h" |
| #include "qpid/framing/MethodContent.h" |
| #include "qpid/framing/SequenceSet.h" |
| #include "qpid/framing/reply_exceptions.h" |
| #include "qpid/framing/DeliveryProperties.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/sys/IntegerTypes.h" |
| |
| #include <boost/bind.hpp> |
| #include <boost/shared_ptr.hpp> |
| |
| namespace { const std::string EMPTY; } |
| |
| namespace qpid { |
| namespace client { |
| |
| using namespace qpid::framing; |
| using namespace qpid::framing::session; //for detach codes |
| |
| typedef sys::Monitor::ScopedLock Lock; |
| typedef sys::Monitor::ScopedUnlock UnLock; |
| typedef sys::ScopedLock<sys::Semaphore> Acquire; |
| |
| |
| SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl> conn) |
| : state(INACTIVE), |
| detachedLifetime(0), |
| maxFrameSize(conn->getNegotiatedSettings().maxFrameSize), |
| id(conn->getNegotiatedSettings().username, name.empty() ? Uuid(true).str() : name), |
| connection(conn), |
| ioHandler(*this), |
| proxy(ioHandler), |
| nextIn(0), |
| nextOut(0) |
| { |
| channel.next = connection.get(); |
| } |
| |
| SessionImpl::~SessionImpl() { |
| { |
| Lock l(state); |
| if (state != DETACHED && state != DETACHING) { |
| QPID_LOG(warning, "Session was not closed cleanly: " << id); |
| // Inform broker but don't wait for detached as that deadlocks. |
| // The detached will be ignored as the channel will be invalid. |
| try { detach(); } catch (...) {} // ignore errors. |
| setState(DETACHED); |
| handleClosed(); |
| state.waitWaiters(); |
| } |
| } |
| connection->erase(channel); |
| } |
| |
| |
| FrameSet::shared_ptr SessionImpl::get() // user thread |
| { |
| // No lock here: pop does a blocking wait. |
| return demux.getDefault()->pop(); |
| } |
| |
| const SessionId SessionImpl::getId() const //user thread |
| { |
| return id; //id is immutable |
| } |
| |
| void SessionImpl::open(uint32_t timeout) // user thread |
| { |
| Lock l(state); |
| if (state == INACTIVE) { |
| setState(ATTACHING); |
| proxy.attach(id.getName(), false); |
| waitFor(ATTACHED); |
| //TODO: timeout will not be set locally until get response to |
| //confirm, should we wait for that? |
| setTimeout(timeout); |
| proxy.commandPoint(nextOut, 0); |
| } else { |
| throw Exception("Open already called for this session"); |
| } |
| } |
| |
| void SessionImpl::close() //user thread |
| { |
| Lock l(state); |
| // close() must be idempotent and no-throw as it will often be called in destructors. |
| if (state != DETACHED && state != DETACHING) { |
| try { |
| if (detachedLifetime) setTimeout(0); |
| detach(); |
| waitFor(DETACHED); |
| } catch (...) {} |
| setState(DETACHED); |
| } |
| } |
| |
| void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread |
| { |
| throw NotImplementedException("Resume not yet implemented by client!"); |
| } |
| |
| void SessionImpl::suspend() //user thread |
| { |
| Lock l(state); |
| detach(); |
| } |
| |
| void SessionImpl::detach() //call with lock held |
| { |
| if (state == ATTACHED) { |
| setState(DETACHING); |
| proxy.detach(id.getName()); |
| } |
| } |
| |
| |
| uint16_t SessionImpl::getChannel() const // user thread |
| { |
| return channel; |
| } |
| |
| void SessionImpl::setChannel(uint16_t c) // user thread |
| { |
| //channel will only ever be set when session is detached (and |
| //about to be resumed) |
| channel = c; |
| } |
| |
| Demux& SessionImpl::getDemux() |
| { |
| return demux; |
| } |
| |
| void SessionImpl::waitForCompletion(const SequenceNumber& id) |
| { |
| Lock l(state); |
| sys::Waitable::ScopedWait w(state); |
| waitForCompletionImpl(id); |
| } |
| |
| void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lock held |
| { |
| while (incompleteOut.contains(id)) { |
| checkOpen(); |
| state.wait(); |
| } |
| } |
| |
| bool SessionImpl::isComplete(const SequenceNumber& id) |
| { |
| Lock l(state); |
| return !incompleteOut.contains(id); |
| } |
| |
| struct IsCompleteUpTo |
| { |
| const SequenceNumber& id; |
| bool result; |
| |
| IsCompleteUpTo(const SequenceNumber& _id) : id(_id), result(true) {} |
| void operator()(const SequenceNumber& start, const SequenceNumber&) |
| { |
| if (start <= id) result = false; |
| } |
| |
| }; |
| |
| bool SessionImpl::isCompleteUpTo(const SequenceNumber& id) |
| { |
| Lock l(state); |
| //return false if incompleteOut contains anything less than id, |
| //true otherwise |
| IsCompleteUpTo f(id); |
| incompleteIn.for_each(f); |
| return f.result; |
| } |
| |
| framing::SequenceNumber SessionImpl::getCompleteUpTo() |
| { |
| SequenceNumber firstIncomplete; |
| { |
| Lock l(state); |
| firstIncomplete = incompleteIn.front(); |
| } |
| return --firstIncomplete; |
| } |
| |
| struct MarkCompleted |
| { |
| const SequenceNumber& id; |
| SequenceSet& completedIn; |
| |
| MarkCompleted(const SequenceNumber& _id, SequenceSet& set) : id(_id), completedIn(set) {} |
| |
| void operator()(const SequenceNumber& start, const SequenceNumber& end) |
| { |
| if (id >= end) { |
| completedIn.add(start, end); |
| } else if (id >= start) { |
| completedIn.add(start, id); |
| } |
| } |
| |
| }; |
| |
| void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer) |
| { |
| Lock l(state); |
| incompleteIn.remove(ids); |
| completedIn.add(ids); |
| if (notifyPeer) { |
| sendCompletion(); |
| } |
| } |
| |
| void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) |
| { |
| Lock l(state); |
| if (cumulative) { |
| //everything in incompleteIn less than or equal to id is now complete |
| MarkCompleted f(id, completedIn); |
| incompleteIn.for_each(f); |
| //make sure id itself is in |
| completedIn.add(id); |
| //then remove anything thats completed from the incomplete set |
| incompleteIn.remove(completedIn); |
| } else if (incompleteIn.contains(id)) { |
| incompleteIn.remove(id); |
| completedIn.add(id); |
| } |
| if (notifyPeer) { |
| sendCompletion(); |
| } |
| } |
| |
| void SessionImpl::setException(const sys::ExceptionHolder& ex) { |
| Lock l(state); |
| setExceptionLH(ex); |
| } |
| |
| void SessionImpl::setExceptionLH(const sys::ExceptionHolder& ex) { // Call with lock held. |
| exceptionHolder = ex; |
| setState(DETACHED); |
| } |
| |
| /** |
| * Called by ConnectionImpl to notify active sessions when connection |
| * is explictly closed |
| */ |
| void SessionImpl::connectionClosed(uint16_t code, const std::string& text) { |
| setException(createConnectionException(code, text)); |
| handleClosed(); |
| } |
| |
| /** |
| * Called by ConnectionImpl to notify active sessions when connection |
| * is disconnected |
| */ |
| void SessionImpl::connectionBroke(const std::string& _text) { |
| setException(sys::ExceptionHolder(new TransportFailure(_text))); |
| handleClosed(); |
| } |
| |
| Future SessionImpl::send(const AMQBody& command) |
| { |
| return sendCommand(command); |
| } |
| |
| Future SessionImpl::send(const AMQBody& command, const MethodContent& content) |
| { |
| return sendCommand(command, &content); |
| } |
| |
| namespace { |
| // Functor for FrameSet::map to send header + content frames but, not method frames. |
| struct SendContentFn { |
| FrameHandler& handler; |
| void operator()(const AMQFrame& f) { |
| if (!f.getMethod()) |
| handler(const_cast<AMQFrame&>(f)); |
| } |
| SendContentFn(FrameHandler& h) : handler(h) {} |
| }; |
| |
| } |
| |
| Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { |
| Acquire a(sendLock); |
| SequenceNumber id = nextOut++; |
| { |
| Lock l(state); |
| checkOpen(); |
| incompleteOut.add(id); |
| } |
| Future f(id); |
| if (command.getMethod()->resultExpected()) { |
| Lock l(state); |
| //result listener must be set before the command is sent |
| f.setFutureResult(results.listenForResult(id)); |
| } |
| AMQFrame frame(command); |
| frame.setEof(false); |
| handleOut(frame); |
| SendContentFn send(out); |
| content.map(send); |
| return f; |
| } |
| |
| void SessionImpl::sendRawFrame(AMQFrame& frame) { |
| Acquire a(sendLock); |
| handleOut(frame); |
| } |
| |
| Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) |
| { |
| Acquire a(sendLock); |
| SequenceNumber id = nextOut++; |
| { |
| Lock l(state); |
| checkOpen(); |
| incompleteOut.add(id); |
| } |
| Future f(id); |
| if (command.getMethod()->resultExpected()) { |
| Lock l(state); |
| //result listener must be set before the command is sent |
| f.setFutureResult(results.listenForResult(id)); |
| } |
| AMQFrame frame(command); |
| if (content) { |
| frame.setEof(false); |
| } |
| handleOut(frame); |
| if (content) { |
| sendContent(*content); |
| } |
| return f; |
| } |
| |
| void SessionImpl::sendContent(const MethodContent& content) |
| { |
| AMQFrame header(content.getHeader()); |
| |
| header.setFirstSegment(false); |
| uint64_t data_length = content.getData().length(); |
| if(data_length > 0){ |
| header.setLastSegment(false); |
| handleOut(header); |
| /*Note: end of frame marker included in overhead but not in size*/ |
| const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); |
| |
| if(data_length < frag_size){ |
| AMQFrame frame((AMQContentBody(content.getData()))); |
| frame.setFirstSegment(false); |
| handleOut(frame); |
| }else{ |
| uint32_t offset = 0; |
| uint32_t remaining = data_length - offset; |
| while (remaining > 0) { |
| uint32_t length = remaining > frag_size ? frag_size : remaining; |
| std::string frag(content.getData().substr(offset, length)); |
| AMQFrame frame((AMQContentBody(frag))); |
| frame.setFirstSegment(false); |
| frame.setLastSegment(true); |
| if (offset > 0) { |
| frame.setFirstFrame(false); |
| } |
| offset += length; |
| remaining = data_length - offset; |
| if (remaining) { |
| frame.setLastFrame(false); |
| } |
| handleOut(frame); |
| } |
| } |
| } else { |
| handleOut(header); |
| } |
| } |
| |
| |
| bool isMessageMethod(AMQMethodBody* method) |
| { |
| return method->isA<MessageTransferBody>(); |
| } |
| |
| bool isMessageMethod(AMQBody* body) |
| { |
| AMQMethodBody* method=body->getMethod(); |
| return method && isMessageMethod(method); |
| } |
| |
| bool isContentFrame(AMQFrame& frame) |
| { |
| AMQBody* body = frame.getBody(); |
| uint8_t type = body->type(); |
| return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); |
| } |
| |
| void SessionImpl::handleIn(AMQFrame& frame) // network thread |
| { |
| try { |
| if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { |
| ; |
| } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { |
| //make sure the command id sequence and completion |
| //tracking takes account of execution commands |
| Lock l(state); |
| completedIn.add(nextIn++); |
| } else { |
| //if not handled by this class, its for the application: |
| deliver(frame); |
| } |
| } |
| catch (const SessionException& e) { |
| setException(createSessionException(e.code, e.getMessage())); |
| } |
| catch (const ChannelException& e) { |
| setException(createChannelException(e.code, e.getMessage())); |
| } |
| } |
| |
| void SessionImpl::handleOut(AMQFrame& frame) // user thread |
| { |
| sendFrame(frame, true); |
| } |
| |
| void SessionImpl::proxyOut(AMQFrame& frame) // network thread |
| { |
| //Note: this case is treated slightly differently that command |
| //frames sent by application; session controls should not be |
| //blocked by bounds checking on the outgoing frame queue. |
| sendFrame(frame, false); |
| } |
| |
| void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock) |
| { |
| connection->expand(frame.encodedSize(), canBlock); |
| channel.handle(frame); |
| } |
| |
| void SessionImpl::deliver(AMQFrame& frame) // network thread |
| { |
| if (!arriving) { |
| arriving = FrameSet::shared_ptr(new FrameSet(nextIn++)); |
| } |
| arriving->append(frame); |
| if (arriving->isComplete()) { |
| //message.transfers will be marked completed only when 'acked' |
| //as completion affects flow control; other commands will be |
| //considered completed as soon as processed here |
| if (arriving->isA<MessageTransferBody>()) { |
| Lock l(state); |
| incompleteIn.add(arriving->getId()); |
| } else { |
| Lock l(state); |
| completedIn.add(arriving->getId()); |
| } |
| demux.handle(arriving); |
| arriving.reset(); |
| } |
| } |
| |
| //control handler methods (called by network thread when controls are |
| //received from peer): |
| |
| void SessionImpl::attach(const std::string& /*name*/, bool /*force*/) |
| { |
| throw NotImplementedException("Client does not support attach"); |
| } |
| |
| void SessionImpl::attached(const std::string& _name) |
| { |
| Lock l(state); |
| if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); |
| setState(ATTACHED); |
| } |
| |
| void SessionImpl::detach(const std::string& _name) |
| { |
| Lock l(state); |
| if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); |
| setState(DETACHED); |
| QPID_LOG(info, "Session detached by peer: " << id); |
| proxy.detached(_name, DETACH_CODE_NORMAL); |
| handleClosed(); |
| } |
| |
| void SessionImpl::detached(const std::string& _name, uint8_t _code) { |
| Lock l(state); |
| if (id.getName() != _name) throw InternalErrorException("Incorrect session name"); |
| setState(DETACHED); |
| if (_code) { |
| //TODO: make sure this works with execution.exception - don't |
| //want to overwrite the code from that |
| setExceptionLH(createChannelException(_code, "Session detached by peer")); |
| QPID_LOG(error, exceptionHolder.what()); |
| } |
| if (detachedLifetime == 0) { |
| handleClosed(); |
| } |
| } |
| |
| void SessionImpl::requestTimeout(uint32_t t) |
| { |
| Lock l(state); |
| detachedLifetime = t; |
| proxy.timeout(t); |
| } |
| |
| void SessionImpl::timeout(uint32_t t) |
| { |
| Lock l(state); |
| detachedLifetime = t; |
| } |
| |
| void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) |
| { |
| if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); |
| |
| Lock l(state); |
| nextIn = id; |
| } |
| |
| void SessionImpl::expected(const framing::SequenceSet& commands, const framing::Array& fragments) |
| { |
| if (!commands.empty() || fragments.encodedSize()) { |
| throw NotImplementedException("Session resumption not yet supported"); |
| } |
| } |
| |
| void SessionImpl::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) |
| { |
| //don't really care too much about this yet |
| } |
| |
| void SessionImpl::completed(const framing::SequenceSet& commands, bool timelyReply) |
| { |
| Lock l(state); |
| incompleteOut.remove(commands); |
| state.notifyAll();//notify any waiters of completion |
| completedOut.add(commands); |
| //notify any waiting results of completion |
| results.completed(commands); |
| |
| if (timelyReply) { |
| proxy.knownCompleted(completedOut); |
| completedOut.clear(); |
| } |
| } |
| |
| void SessionImpl::knownCompleted(const framing::SequenceSet& commands) |
| { |
| Lock l(state); |
| completedIn.remove(commands); |
| } |
| |
| void SessionImpl::flush(bool expected, bool confirmed, bool completed) |
| { |
| Lock l(state); |
| if (expected) { |
| proxy.expected(SequenceSet(nextIn), Array()); |
| } |
| if (confirmed) { |
| proxy.confirmed(completedIn, Array()); |
| } |
| if (completed) { |
| proxy.completed(completedIn, true); |
| } |
| } |
| |
| void SessionImpl::sendCompletion() |
| { |
| Lock l(state); |
| sendCompletionImpl(); |
| } |
| |
| void SessionImpl::sendFlush() |
| { |
| Lock l(state); |
| proxy.flush(false, false, true); |
| } |
| |
| void SessionImpl::sendCompletionImpl() |
| { |
| proxy.completed(completedIn, completedIn.span() > 1000); |
| } |
| |
| void SessionImpl::gap(const framing::SequenceSet& /*commands*/) |
| { |
| throw NotImplementedException("gap not yet supported"); |
| } |
| |
| void SessionImpl::sync() {} |
| |
| void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value) |
| { |
| Lock l(state); |
| results.received(commandId, value); |
| } |
| |
| void SessionImpl::exception(uint16_t errorCode, |
| const framing::SequenceNumber& commandId, |
| uint8_t classCode, |
| uint8_t commandCode, |
| uint8_t /*fieldIndex*/, |
| const std::string& description, |
| const framing::FieldTable& /*errorInfo*/) |
| { |
| Lock l(state); |
| setExceptionLH(createSessionException(errorCode, description)); |
| QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() |
| << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); |
| |
| if (detachedLifetime) |
| setTimeout(0); |
| } |
| |
| //private utility methods: |
| |
| inline void SessionImpl::setState(State s) //call with lock held |
| { |
| state = s; |
| } |
| |
| inline void SessionImpl::waitFor(State s) //call with lock held |
| { |
| // We can be DETACHED at any time |
| if (s == DETACHED) state.waitFor(DETACHED); |
| else state.waitFor(States(s, DETACHED)); |
| check(); |
| } |
| |
| void SessionImpl::check() const //call with lock held. |
| { |
| exceptionHolder.raise(); |
| } |
| |
| void SessionImpl::checkOpen() const //call with lock held. |
| { |
| check(); |
| if (state != ATTACHED) { |
| throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached")); |
| } |
| } |
| |
| void SessionImpl::assertOpen() const |
| { |
| Lock l(state); |
| checkOpen(); |
| } |
| |
| bool SessionImpl::hasError() const |
| { |
| Lock l(state); |
| return !exceptionHolder.empty(); |
| } |
| |
| void SessionImpl::handleClosed() |
| { |
| demux.close(exceptionHolder.empty() ? |
| sys::ExceptionHolder(new ClosedException()) : exceptionHolder); |
| results.close(); |
| } |
| |
| uint32_t SessionImpl::setTimeout(uint32_t seconds) { |
| proxy.requestTimeout(seconds); |
| // FIXME aconway 2008-10-07: wait for timeout response from broker |
| // and use value retured by broker. |
| detachedLifetime = seconds; |
| return detachedLifetime; |
| } |
| |
| uint32_t SessionImpl::getTimeout() const { |
| return detachedLifetime; |
| } |
| |
| boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection() |
| { |
| return connection; |
| } |
| |
| }} |