blob: e366def8071641511b1173937a811214f3d79c06 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef _SessionImpl_
#define _SessionImpl_
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Demux.h"
#include "qpid/client/Execution.h"
#include "qpid/client/Results.h"
#include "qpid/client/ClientImportExport.h"
#include "qpid/SessionId.h"
#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/sys/Semaphore.h"
#include "qpid/sys/StateMonitor.h"
#include "qpid/sys/ExceptionHolder.h"
#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/optional.hpp>
namespace qpid {
namespace framing {
class FrameSet;
class MethodContent;
class SequenceSet;
}
namespace client {
class Future;
class ConnectionImpl;
class SessionHandler;
///@internal
// TODO aconway 2011-04-05: exposed for use by cluster::UpdateClient,
// clean up dependencies.
class QPID_CLIENT_CLASS_EXTERN SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
private framing::AMQP_ClientOperations::SessionHandler,
private framing::AMQP_ClientOperations::ExecutionHandler,
private framing::AMQP_ClientOperations::MessageHandler
{
public:
SessionImpl(const std::string& name, boost::shared_ptr<ConnectionImpl>);
~SessionImpl();
//NOTE: Public functions called in user thread.
framing::FrameSet::shared_ptr get();
const SessionId getId() const;
uint16_t getChannel() const;
void setChannel(uint16_t channel);
void open(uint32_t detachedLifetime);
void close();
void resume(boost::shared_ptr<ConnectionImpl>);
void suspend();
QPID_CLIENT_EXTERN void assertOpen() const;
QPID_CLIENT_EXTERN bool hasError() const;
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
/**
* This method takes the content as a FrameSet; if reframe=false,
* the caller is resposnible for ensuring that the header and
* content frames in that set are correct for this connection
* (right flags, right fragmentation etc). If reframe=true, then
* the header and content from the frameset will be copied and
* reframed correctly for the connection.
*/
QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false);
void sendRawFrame(framing::AMQFrame& frame);
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
framing::SequenceNumber getCompleteUpTo();
void waitForCompletion(const framing::SequenceNumber& id);
void sendCompletion();
void sendFlush();
void setException(const sys::ExceptionHolder&);
//NOTE: these are called by the network thread when the connection is closed or dies
void connectionClosed(uint16_t code, const std::string& text);
void connectionBroke(const std::string& text);
/** Set timeout in seconds, returns actual timeout allowed by broker */
uint32_t setTimeout(uint32_t requestedSeconds);
/** Get timeout in seconds. */
uint32_t getTimeout() const;
/**
* get the Connection associated with this connection
*/
boost::shared_ptr<ConnectionImpl> getConnection();
void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
/** Suppress sending detach in destructor. Used by cluster to build session state */
void disableAutoDetach();
private:
enum State {
INACTIVE,
ATTACHING,
ATTACHED,
DETACHING,
DETACHED
};
typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
inline void setState(State s);
inline void waitFor(State);
void setExceptionLH(const sys::ExceptionHolder&); // LH = lock held when called.
void detach();
void check() const;
void checkOpen() const;
void handleClosed();
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
/**
* Sends session controls. This case is treated slightly
* differently than command frames sent by the application via
* handleOut(); session controlsare not subject to bounds checking
* on the outgoing frame queue.
*/
void proxyOut(framing::AMQFrame& frame);
void sendFrame(framing::AMQFrame& frame, bool canBlock);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
void sendCompletionImpl();
// Note: Following methods are called by network thread in
// response to session controls from the broker
void attach(const std::string& name, bool force);
void attached(const std::string& name);
void detach(const std::string& name);
void detached(const std::string& name, uint8_t detachCode);
void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
void completed(const framing::SequenceSet& commands, bool timelyReply);
void knownCompleted(const framing::SequenceSet& commands);
void flush(bool expected, bool confirmed, bool completed);
void gap(const framing::SequenceSet& commands);
// Note: Following methods are called by network thread in
// response to execution commands from the broker
void sync();
void result(const framing::SequenceNumber& commandId, const std::string& value);
void exception(uint16_t errorCode,
const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
// Note: Following methods are called by network thread in
// response to message commands from the broker
// EXCEPT Message.Transfer
void accept(const qpid::framing::SequenceSet&);
void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
void release(const qpid::framing::SequenceSet&, bool);
qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
void setFlowMode(const std::string&, uint8_t);
void flow(const std::string&, uint8_t, uint32_t);
void stop(const std::string&);
sys::ExceptionHolder exceptionHolder;
mutable StateMonitor state;
mutable sys::Semaphore sendLock;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
const SessionId id;
boost::shared_ptr<ConnectionImpl> connection;
framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
Results results;
Demux demux;
framing::FrameSet::shared_ptr arriving;
framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete
framing::SequenceSet completedIn;//incoming commands that are have completed
framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete
framing::SequenceSet completedOut;//outgoing commands that we know to be completed
framing::SequenceNumber nextIn;
framing::SequenceNumber nextOut;
SessionState sessionState;
// Only keep track of message credit
sys::Semaphore* sendMsgCredit;
bool doClearDeliveryPropertiesExchange;
bool autoDetach;
friend class client::SessionHandler;
};
}} // namespace qpid::client
#endif