blob: 7e631b4505954bd70d2c9b86d6328dc4cb77c5d8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef _SessionHandlerImpl_
#define _SessionHandlerImpl_
#include <map>
#include <sstream>
#include <vector>
#include <exception>
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
#include <AutoDelete.h>
#include <ExchangeRegistry.h>
#include <BrokerChannel.h>
#include <ConnectionToken.h>
#include <DirectExchange.h>
#include <OutputHandler.h>
#include <ProtocolInitiation.h>
#include <QueueRegistry.h>
#include <sys/SessionContext.h>
#include <sys/SessionHandler.h>
#include <sys/TimeoutHandler.h>
#include <TopicExchange.h>
namespace qpid {
namespace broker {
struct ChannelException : public std::exception {
u_int16_t code;
string text;
ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
~ChannelException() throw() {}
const char* what() const throw() { return text.c_str(); }
};
struct ConnectionException : public std::exception {
u_int16_t code;
string text;
ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
~ConnectionException() throw() {}
const char* what() const throw() { return text.c_str(); }
};
class Settings {
public:
const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
const u_int64_t stagingThreshold;
Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
};
class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public virtual qpid::framing::AMQP_ServerOperations,
public virtual ConnectionToken
{
typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy* client;
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
AutoDelete* const cleaner;
const Settings settings;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<ConnectionHandler> connectionHandler;
std::auto_ptr<ExchangeHandler> exchangeHandler;
std::auto_ptr<QueueHandler> queueHandler;
std::auto_ptr<TxHandler> txHandler;
std::map<u_int16_t, Channel*> channels;
std::vector<Queue::shared_ptr> exclusiveQueues;
u_int32_t framemax;
u_int16_t heartbeat;
void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
Channel* getChannel(u_int16_t channel);
/**
* Get named queue, never returns 0.
* @return: named queue or default queue for channel if name=""
* @exception: ChannelException if no queue of that name is found.
* @exception: ConnectionException if no queue specified and channel has not declared one.
*/
Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
Exchange::shared_ptr findExchange(const string& name);
public:
SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
virtual void idleOut();
virtual void idleIn();
virtual void closed();
virtual ~SessionHandlerImpl();
class ConnectionHandlerImpl : public virtual ConnectionHandler{
SessionHandlerImpl* parent;
public:
inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,
const string& response, const string& locale);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void secureOk(u_int16_t channel, const string& response);
virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,
u_int16_t methodId);
virtual void closeOk(u_int16_t channel);
virtual ~ConnectionHandlerImpl(){}
};
class ChannelHandlerImpl : public virtual ChannelHandler{
SessionHandlerImpl* parent;
public:
inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void open(u_int16_t channel, const string& outOfBand);
virtual void flow(u_int16_t channel, bool active);
virtual void flowOk(u_int16_t channel, bool active);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,
u_int16_t classId, u_int16_t methodId);
virtual void closeOk(u_int16_t channel);
virtual ~ChannelHandlerImpl(){}
};
class ExchangeHandlerImpl : public virtual ExchangeHandler{
SessionHandlerImpl* parent;
public:
inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,
bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
const qpid::framing::FieldTable& arguments);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);
virtual ~ExchangeHandlerImpl(){}
};
class QueueHandlerImpl : public virtual QueueHandler{
SessionHandlerImpl* parent;
public:
inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);
virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,
const string& exchange, const string& routingKey, bool nowait,
const qpid::framing::FieldTable& arguments);
virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,
bool nowait);
// Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20
virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,
bool nowait);
virtual ~QueueHandlerImpl(){}
};
class BasicHandlerImpl : public virtual BasicHandler{
SessionHandlerImpl* parent;
public:
inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
virtual void consume(
u_int16_t channel, u_int16_t ticket, const string& queue,
const string& consumerTag, bool noLocal, bool noAck,
bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);
virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);
virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
virtual void recover(u_int16_t channel, bool requeue);
virtual ~BasicHandlerImpl(){}
};
class TxHandlerImpl : public virtual TxHandler{
SessionHandlerImpl* parent;
public:
TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
virtual ~TxHandlerImpl() {}
virtual void select(u_int16_t channel);
virtual void commit(u_int16_t channel);
virtual void rollback(u_int16_t channel);
};
inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }
inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }
inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }
inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }
inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }
inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
// Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test;
// however v0.9 will not - kpvdr 2006-11-17
inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }
};
}
}
#endif