blob: 618f3cbc92a59c3f2d40515d2a003c8f5ae32561 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <algorithm>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <Connection.h>
#include <ClientChannel.h>
#include <ClientMessage.h>
#include <QpidError.h>
#include <iostream>
#include <sstream>
#include <MethodBodyInstances.h>
#include <functional>
using namespace qpid::framing;
using namespace qpid::sys;
namespace qpid {
namespace client {
const std::string Connection::OK("OK");
Connection::Connection(
bool _debug, uint32_t _max_frame_size,
framing::ProtocolVersion _version
) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
}
Connection::~Connection(){}
void Connection::setConnector(Connector& con)
{
connector = &con;
connector->setInputHandler(this);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
}
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
{
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
connector->connect(host, port);
channels[0] = &channel0;
channel0.open(0, *this);
channel0.protocolInit(uid, pwd, vhost);
isOpen = true;
}
void Connection::shutdown() {
close();
}
void Connection::close(
ReplyCode code, const string& msg, ClassId classId, MethodId methodId
)
{
if(isOpen) {
// TODO aconway 2007-01-29: Exception handling - could end up
// partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
new ConnectionCloseBody(
getVersion(), code, msg, classId, methodId));
using boost::bind;
for_each(channels.begin(), channels.end(),
bind(&Channel::closeInternal,
bind(&ChannelMap::value_type::second, _1)));
channels.clear();
connector->close();
}
}
void Connection::openChannel(Channel& channel) {
ChannelId id = ++channelIdCounter;
assert (channels.find(id) == channels.end());
assert(out);
channels[id] = &channel;
channel.open(id, *this);
}
void Connection::erase(ChannelId id) {
channels.erase(id);
}
void Connection::received(AMQFrame* frame){
// FIXME aconway 2007-01-25: Mutex
ChannelId id = frame->getChannel();
Channel* channel = channels[id];
// FIXME aconway 2007-01-26: Exception thrown here is hanging the
// client. Need to review use of exceptions.
if (channel == 0)
THROW_QPID_ERROR(
PROTOCOL_ERROR+504,
(boost::format("Invalid channel number %g") % id).str());
try{
channel->handleBody(frame->getBody());
}catch(const qpid::QpidError& e){
channelException(
*channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
}
}
void Connection::send(AMQFrame* frame) {
out->send(frame);
}
void Connection::channelException(
Channel& channel, AMQMethodBody* method, const QpidError& e)
{
int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
string msg = e.msg;
if(method == 0)
channel.close(code, msg);
else
channel.close(
code, msg, method->amqpClassId(), method->amqpMethodId());
}
void Connection::idleIn(){
connector->close();
}
void Connection::idleOut(){
out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
}
}} // namespace qpid::client