blob: f0b998086166fed72eff1ebfd3c6f1263f8f0f39 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Connection.h"
#include "SessionState.h"
#include "Bridge.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/enum.h"
#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <algorithm>
#include <iostream>
#include <assert.h>
using namespace qpid::sys;
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::ptr_map_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
mgmtObject(0),
links(broker_.getLinks()),
agent(0)
{
Manageable* parent = broker.GetVhostObject();
if (isLink)
links.notifyConnection(mgmtId, this);
if (parent != 0)
{
agent = ManagementAgent::Singleton::getInstance();
// TODO set last bool true if system connection
if (agent != 0)
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
agent->addObject(mgmtObject);
ConnectionState::setUrl(mgmtId);
}
}
void Connection::requestIOProcessing(boost::function0<void> callback)
{
ioCallback = callback;
out.activateOutput();
}
Connection::~Connection()
{
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (!isLink)
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
}
if (isLink)
links.notifyClosed(mgmtId);
}
void Connection::received(framing::AMQFrame& frame) {
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
if (isLink)
recordFromServer(frame);
else
recordFromClient(frame);
}
void Connection::recordFromServer(framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesToClient();
mgmtObject->inc_bytesToClient(frame.encodedSize());
}
}
void Connection::recordFromClient(framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesFromClient();
mgmtObject->inc_bytesFromClient(frame.encodedSize());
}
}
string Connection::getAuthMechanism()
{
if (!isLink)
return string("ANONYMOUS");
return links.getAuthMechanism(mgmtId);
}
string Connection::getAuthCredentials()
{
if (!isLink)
return string();
if (mgmtObject != 0)
{
if (links.getAuthMechanism(mgmtId) == "ANONYMOUS")
mgmtObject->set_authIdentity("anonymous");
else
mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId));
}
return links.getAuthCredentials(mgmtId);
}
void Connection::notifyConnectionForced(const string& text)
{
if (isLink)
links.notifyConnectionForced(mgmtId, text);
}
void Connection::setUserId(const string& userId)
{
ConnectionState::setUserId(userId);
if (mgmtObject != 0) {
mgmtObject->set_authIdentity(userId);
agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
}
}
void Connection::setFederationLink(bool b)
{
ConnectionState::setFederationLink(b);
if (mgmtObject != 0)
mgmtObject->set_federationLink(b);
}
void Connection::close(connection::CloseCode code, const string& text)
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
channels.clear();
getOutput().close();
}
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
void Connection::idleOut(){}
void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
Queue::tryAutoDelete(broker, q);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
} catch(std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
assert(0);
}
}
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
bool Connection::doOutput() {
try{
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
ioCallback = 0;
if (mgmtClosing)
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
else
//then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.getMessage());
}catch(std::exception& e){
close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what());
}
return false;
}
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
if (i != channels.end()) channels.erase(i);
}
SessionHandler& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i=channels.find(id);
if (i == channels.end()) {
i = channels.insert(id, new SessionHandler(*this, id)).first;
}
return *ptr_map_ptr(i);
}
ManagementObject* Connection::GetManagementObject(void) const
{
return (ManagementObject*) mgmtObject;
}
Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]");
switch (methodId)
{
case _qmf::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject != 0) mgmtObject->set_closing(1);
out.activateOutput();
status = Manageable::STATUS_OK;
break;
}
return status;
}
}}