blob: 910ae22be8dc2dc5c5c73c715dc721e981a04963 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/console/SessionManager.h"
#include "qpid/console/Schema.h"
#include "qpid/console/Agent.h"
#include "qpid/console/ConsoleListener.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FieldTable.h"
using namespace qpid::console;
using namespace qpid::sys;
using namespace qpid;
using namespace std;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) :
listener(_listener), settings(_settings)
{
bindingKeys();
}
SessionManager::~SessionManager()
{
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
delete *iter;
for (map<string, Package*>::iterator iter = packages.begin();
iter != packages.end(); iter++) {
for (Package::ClassMap::iterator citer = iter->second->classes.begin();
citer != iter->second->classes.end();
citer++)
delete citer->second;
delete iter->second;
}
}
Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
{
Broker* broker(new Broker(*this, settings));
{
Mutex::ScopedLock l(brokerListLock);
brokers.push_back(broker);
}
return broker;
}
void SessionManager::delBroker(Broker* broker)
{
Mutex::ScopedLock l(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
if (*iter == broker) {
brokers.erase(iter);
delete broker;
return;
}
}
void SessionManager::getPackages(NameVector& packageNames)
{
allBrokersStable();
packageNames.clear();
{
Mutex::ScopedLock l(lock);
for (map<string, Package*>::iterator iter = packages.begin();
iter != packages.end(); iter++)
packageNames.push_back(iter->first);
}
}
void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName)
{
allBrokersStable();
classKeys.clear();
map<string, Package*>::iterator iter = packages.find(packageName);
if (iter == packages.end())
return;
Package& package = *(iter->second);
for (Package::ClassMap::const_iterator piter = package.classes.begin();
piter != package.classes.end(); piter++) {
ClassKey key(piter->second->getClassKey());
classKeys.push_back(key);
}
}
SchemaClass& SessionManager::getSchema(const ClassKey& classKey)
{
allBrokersStable();
map<string, Package*>::iterator iter = packages.find(classKey.getPackageName());
if (iter == packages.end())
throw Exception("Unknown package");
Package& package = *(iter->second);
Package::NameHash key(classKey.getClassName(), classKey.getHash());
Package::ClassMap::iterator cIter = package.classes.find(key);
if (cIter == package.classes.end())
throw Exception("Unknown class");
return *(cIter->second);
}
void SessionManager::bindPackage(const std::string& packageName)
{
stringstream key;
key << "console.obj.*.*." << packageName << ".#";
bindingKeyList.push_back(key.str());
for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++)
(*iter)->addBinding(key.str());
}
void SessionManager::bindClass(const ClassKey& classKey)
{
bindClass(classKey.getPackageName(), classKey.getClassName());
}
void SessionManager::bindClass(const std::string& packageName, const std::string& className)
{
stringstream key;
key << "console.obj.*.*." << packageName << "." << className << ".#";
bindingKeyList.push_back(key.str());
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
(*iter)->addBinding(key.str());
}
void SessionManager::bindEvent(const ClassKey& classKey)
{
bindEvent(classKey.getPackageName(), classKey.getClassName());
}
void SessionManager::bindEvent(const std::string& packageName, const std::string& eventName)
{
if (!settings.userBindings) throw Exception("Session not configured for userBindings.");
if (settings.rcvEvents) throw Exception("Session already configured to receive all events.");
stringstream key;
key << "console.event.*.*." << packageName;
if (eventName.length()) {
key << "." << eventName << ".#";
} else {
key << ".#";
}
bindingKeyList.push_back(key.str());
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
(*iter)->addBinding(key.str());
}
void SessionManager::getAgents(Agent::Vector& agents, Broker* broker)
{
agents.clear();
if (broker != 0) {
broker->appendAgents(agents);
} else {
for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
(*iter)->appendAgents(agents);
}
}
}
void SessionManager::getObjects(Object::Vector& objects, const std::string& className,
Broker* _broker, Agent* _agent)
{
Agent::Vector agentList;
if (_agent != 0) {
agentList.push_back(_agent);
_agent->getBroker()->waitForStable();
} else {
if (_broker != 0) {
_broker->appendAgents(agentList);
_broker->waitForStable();
} else {
allBrokersStable();
Mutex::ScopedLock _lock(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
(*iter)->appendAgents(agentList);
}
}
}
FieldTable ft;
uint32_t sequence;
ft.setString("_class", className);
getResult.clear();
syncSequenceList.clear();
error = string();
if (agentList.empty()) {
objects = getResult;
return;
}
for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
Agent* agent = *iter;
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
stringstream routingKey;
routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank();
{
Mutex::ScopedLock _lock(lock);
sequence = sequenceManager.reserve("multiget");
syncSequenceList.insert(sequence);
}
agent->getBroker()->encodeHeader(buffer, 'G', sequence);
ft.encode(buffer);
uint32_t length = buffer.getPosition();
buffer.reset();
agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
}
{
Mutex::ScopedLock _lock(lock);
sys::AbsTime startTime = sys::now();
while (!syncSequenceList.empty() && error.empty()) {
cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
sys::AbsTime currTime = sys::now();
if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC)
break;
}
}
objects = getResult;
}
void SessionManager::bindingKeys()
{
bindingKeyList.push_back("schema.#");
if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
bindingKeyList.push_back("console.#");
} else {
if (settings.rcvObjects && !settings.userBindings)
bindingKeyList.push_back("console.obj.#");
else
bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
if (settings.rcvEvents)
bindingKeyList.push_back("console.event.#");
if (settings.rcvHeartbeats)
bindingKeyList.push_back("console.heartbeat");
}
}
void SessionManager::allBrokersStable()
{
Mutex::ScopedLock l(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
if ((*iter)->isConnected())
(*iter)->waitForStable();
}
void SessionManager::startProtocol(Broker* broker)
{
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
broker->encodeHeader(buffer, 'B');
uint32_t length = 512 - buffer.available();
buffer.reset();
broker->connThreadBody.sendBuffer(buffer, length);
}
void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t)
{
framing::Uuid brokerId;
brokerId.decode(inBuffer);
broker->setBrokerId(brokerId);
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
uint32_t sequence = sequenceManager.reserve("startup");
broker->encodeHeader(buffer, 'P', sequence);
uint32_t length = 512 - buffer.available();
buffer.reset();
broker->connThreadBody.sendBuffer(buffer, length);
if (listener != 0) {
listener->brokerInfo(*broker);
}
}
void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t)
{
string packageName;
inBuffer.getShortString(packageName);
{
Mutex::ScopedLock l(lock);
map<string, Package*>::iterator iter = packages.find(packageName);
if (iter == packages.end()) {
packages[packageName] = new Package(packageName);
if (listener != 0)
listener->newPackage(packageName);
}
}
broker->incOutstanding();
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
uint32_t sequence = sequenceManager.reserve("startup");
broker->encodeHeader(buffer, 'Q', sequence);
buffer.putShortString(packageName);
uint32_t length = 512 - buffer.available();
buffer.reset();
broker->connThreadBody.sendBuffer(buffer, length);
}
void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence)
{
Mutex::ScopedLock l(lock);
uint32_t resultCode = inBuffer.getLong();
string resultText;
inBuffer.getShortString(resultText);
string context = sequenceManager.release(sequence);
if (resultCode != 0)
QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText);
if (context == "startup") {
broker->decOutstanding();
} else if (context == "multiget") {
if (syncSequenceList.count(sequence) == 1) {
syncSequenceList.erase(sequence);
if (syncSequenceList.empty()) {
cv.notify();
}
}
}
// TODO: Other context cases
}
void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
{
string packageName;
string className;
uint8_t hash[16];
/*kind*/ (void) inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
{
Mutex::ScopedLock l(lock);
map<string, Package*>::iterator pIter = packages.find(packageName);
if (pIter == packages.end() || pIter->second->getClass(className, hash))
return;
}
broker->incOutstanding();
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
uint32_t sequence = sequenceManager.reserve("startup");
broker->encodeHeader(buffer, 'S', sequence);
buffer.putShortString(packageName);
buffer.putShortString(className);
buffer.putBin128(hash);
uint32_t length = 512 - buffer.available();
buffer.reset();
broker->connThreadBody.sendBuffer(buffer, length);
}
void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence)
{
if (broker->methodObject) {
broker->methodObject->handleMethodResp(buffer, sequence);
}
}
void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
{
}
void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/)
{
string packageName;
string className;
uint8_t hash[16];
SchemaClass* schemaClass;
buffer.getShortString(packageName);
buffer.getShortString(className);
buffer.getBin128(hash);
{
Mutex::ScopedLock l(lock);
map<string, Package*>::iterator pIter = packages.find(packageName);
if (pIter == packages.end())
return;
schemaClass = pIter->second->getClass(className, hash);
if (schemaClass == 0)
return;
}
Event event(broker, schemaClass, buffer);
if (listener)
listener->event(event);
}
void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)
{
uint8_t kind;
string packageName;
string className;
uint8_t hash[16];
kind = inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
{
Mutex::ScopedLock l(lock);
map<string, Package*>::iterator pIter = packages.find(packageName);
if (pIter != packages.end() && !pIter->second->getClass(className, hash)) {
ClassKey key(packageName, className, hash);
SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer));
pIter->second->addClass(className, hash, schemaClass);
if (listener != 0) {
listener->newClass(schemaClass->getClassKey());
}
}
}
sequenceManager.release(sequence);
broker->decOutstanding();
}
void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat)
{
string packageName;
string className;
uint8_t hash[16];
SchemaClass* schemaClass;
buffer.getShortString(packageName);
buffer.getShortString(className);
buffer.getBin128(hash);
{
Mutex::ScopedLock l(lock);
map<string, Package*>::iterator pIter = packages.find(packageName);
if (pIter == packages.end())
return;
schemaClass = pIter->second->getClass(className, hash);
if (schemaClass == 0)
return;
}
Object object(broker, schemaClass, buffer, prop, stat);
if (prop && className == "agent" && packageName == "org.apache.qpid.broker")
broker->updateAgent(object);
{
Mutex::ScopedLock l(lock);
if (syncSequenceList.count(sequence) == 1) {
if (!object.isDeleted())
getResult.push_back(object);
return;
}
}
if (listener) {
if (prop)
listener->objectProps(*broker, object);
if (stat)
listener->objectStats(*broker, object);
}
}
void SessionManager::handleBrokerConnect(Broker* broker)
{
if (listener != 0)
listener->brokerConnected(*broker);
}
void SessionManager::handleBrokerDisconnect(Broker* broker)
{
if (listener != 0)
listener->brokerDisconnected(*broker);
}