| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "qmf/engine/ConsoleImpl.h" |
| #include "qmf/engine/MessageImpl.h" |
| #include "qmf/engine/SchemaImpl.h" |
| #include "qmf/engine/Typecode.h" |
| #include "qmf/engine/ObjectImpl.h" |
| #include "qmf/engine/ObjectIdImpl.h" |
| #include "qmf/engine/QueryImpl.h" |
| #include "qmf/engine/ValueImpl.h" |
| #include "qmf/engine/Protocol.h" |
| #include "qmf/engine/SequenceManager.h" |
| #include "qmf/engine/BrokerProxyImpl.h" |
| #include <qpid/framing/Buffer.h> |
| #include <qpid/framing/Uuid.h> |
| #include <qpid/framing/FieldTable.h> |
| #include <qpid/framing/FieldValue.h> |
| #include <qpid/log/Statement.h> |
| #include <qpid/sys/Time.h> |
| #include <qpid/sys/SystemInfo.h> |
| #include <string.h> |
| #include <iostream> |
| #include <fstream> |
| |
| using namespace std; |
| using namespace qmf::engine; |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| |
| namespace { |
| const char* QMF_EXCHANGE = "qpid.management"; |
| } |
| |
| #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} |
| |
| ConsoleEvent ConsoleEventImpl::copy() |
| { |
| ConsoleEvent item; |
| |
| ::memset(&item, 0, sizeof(ConsoleEvent)); |
| item.kind = kind; |
| item.agent = agent.get(); |
| item.classKey = classKey; |
| item.object = object.get(); |
| item.context = context; |
| item.event = event.get(); |
| item.timestamp = timestamp; |
| item.hasProps = hasProps; |
| item.hasStats = hasStats; |
| |
| STRING_REF(name); |
| |
| return item; |
| } |
| |
| ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s) |
| { |
| bindingList.push_back(pair<string, string>(string(), "schema.#")); |
| if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { |
| bindingList.push_back(pair<string, string>(string(), "console.#")); |
| } else { |
| if (settings.rcvObjects && !settings.userBindings) |
| bindingList.push_back(pair<string, string>(string(), "console.obj.#")); |
| else |
| bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); |
| if (settings.rcvEvents) |
| bindingList.push_back(pair<string, string>(string(), "console.event.#")); |
| if (settings.rcvHeartbeats) |
| bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); |
| } |
| } |
| |
| ConsoleImpl::~ConsoleImpl() |
| { |
| // This function intentionally left blank. |
| } |
| |
| bool ConsoleImpl::getEvent(ConsoleEvent& event) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (eventQueue.empty()) |
| return false; |
| event = eventQueue.front()->copy(); |
| return true; |
| } |
| |
| void ConsoleImpl::popEvent() |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (!eventQueue.empty()) |
| eventQueue.pop_front(); |
| } |
| |
| void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/) |
| { |
| Mutex::ScopedLock _lock(lock); |
| brokerList.push_back(broker.impl); |
| } |
| |
| void ConsoleImpl::delConnection(BrokerProxy& broker) |
| { |
| Mutex::ScopedLock _lock(lock); |
| for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); |
| iter != brokerList.end(); iter++) |
| if (*iter == broker.impl) { |
| brokerList.erase(iter); |
| break; |
| } |
| } |
| |
| uint32_t ConsoleImpl::packageCount() const |
| { |
| Mutex::ScopedLock _lock(lock); |
| return packages.size(); |
| } |
| |
| const string& ConsoleImpl::getPackageName(uint32_t idx) const |
| { |
| const static string empty; |
| |
| Mutex::ScopedLock _lock(lock); |
| if (idx >= packages.size()) |
| return empty; |
| |
| PackageList::const_iterator iter = packages.begin(); |
| for (uint32_t i = 0; i < idx; i++) iter++; |
| return iter->first; |
| } |
| |
| uint32_t ConsoleImpl::classCount(const char* packageName) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(packageName); |
| if (pIter == packages.end()) |
| return 0; |
| |
| const ObjectClassList& oList = pIter->second.first; |
| const EventClassList& eList = pIter->second.second; |
| |
| return oList.size() + eList.size(); |
| } |
| |
| const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(packageName); |
| if (pIter == packages.end()) |
| return 0; |
| |
| const ObjectClassList& oList = pIter->second.first; |
| const EventClassList& eList = pIter->second.second; |
| uint32_t count = 0; |
| |
| for (ObjectClassList::const_iterator oIter = oList.begin(); |
| oIter != oList.end(); oIter++) { |
| if (count == idx) |
| return oIter->second->getClassKey(); |
| count++; |
| } |
| |
| for (EventClassList::const_iterator eIter = eList.begin(); |
| eIter != eList.end(); eIter++) { |
| if (count == idx) |
| return eIter->second->getClassKey(); |
| count++; |
| } |
| |
| return 0; |
| } |
| |
| ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return CLASS_OBJECT; |
| |
| const EventClassList& eList = pIter->second.second; |
| if (eList.find(key) != eList.end()) |
| return CLASS_EVENT; |
| return CLASS_OBJECT; |
| } |
| |
| const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return 0; |
| |
| const ObjectClassList& oList = pIter->second.first; |
| ObjectClassList::const_iterator iter = oList.find(key); |
| if (iter == oList.end()) |
| return 0; |
| return iter->second; |
| } |
| |
| const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return 0; |
| |
| const EventClassList& eList = pIter->second.second; |
| EventClassList::const_iterator iter = eList.find(key); |
| if (iter == eList.end()) |
| return 0; |
| return iter->second; |
| } |
| |
| void ConsoleImpl::bindPackage(const char* packageName) |
| { |
| stringstream key; |
| key << "console.obj.*.*." << packageName << ".#"; |
| Mutex::ScopedLock _lock(lock); |
| bindingList.push_back(pair<string, string>(string(), key.str())); |
| for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); |
| iter != brokerList.end(); iter++) |
| (*iter)->addBinding(QMF_EXCHANGE, key.str()); |
| } |
| |
| void ConsoleImpl::bindClass(const SchemaClassKey* classKey) |
| { |
| stringstream key; |
| key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; |
| Mutex::ScopedLock _lock(lock); |
| bindingList.push_back(pair<string, string>(string(), key.str())); |
| for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); |
| iter != brokerList.end(); iter++) |
| (*iter)->addBinding(QMF_EXCHANGE, key.str()); |
| } |
| |
| void ConsoleImpl::bindClass(const char* packageName, const char* className) |
| { |
| stringstream key; |
| key << "console.obj.*.*." << packageName << "." << className << ".#"; |
| Mutex::ScopedLock _lock(lock); |
| bindingList.push_back(pair<string, string>(string(), key.str())); |
| for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); |
| iter != brokerList.end(); iter++) |
| (*iter)->addBinding(QMF_EXCHANGE, key.str()); |
| } |
| |
| |
| void ConsoleImpl::bindEvent(const SchemaClassKey* classKey) |
| { |
| bindEvent(classKey->getPackageName(), classKey->getClassName()); |
| } |
| |
| void ConsoleImpl::bindEvent(const char* packageName, const char* eventName) |
| { |
| if (!settings.userBindings) throw qpid::Exception("Console not configured for userBindings."); |
| if (settings.rcvEvents) throw qpid::Exception("Console already configured to receive all events."); |
| |
| stringstream key; |
| key << "console.event.*.*." << packageName; |
| if (eventName && *eventName) { |
| key << "." << eventName << ".#"; |
| } else { |
| key << ".#"; |
| } |
| |
| Mutex::ScopedLock _lock(lock); |
| bindingList.push_back(pair<string, string>(string(), key.str())); |
| for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); |
| iter != brokerList.end(); iter++) |
| (*iter)->addBinding(QMF_EXCHANGE, key.str()); |
| } |
| |
| /* |
| void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync) |
| { |
| } |
| |
| void ConsoleImpl::touchSync(SyncQuery& sync) |
| { |
| } |
| |
| void ConsoleImpl::endSync(SyncQuery& sync) |
| { |
| } |
| */ |
| |
| void ConsoleImpl::learnPackage(const string& packageName) |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (packages.find(packageName) == packages.end()) { |
| packages.insert(pair<string, pair<ObjectClassList, EventClassList> > |
| (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); |
| eventNewPackage(packageName); |
| } |
| } |
| |
| void ConsoleImpl::learnClass(SchemaObjectClass* cls) |
| { |
| Mutex::ScopedLock _lock(lock); |
| const SchemaClassKey* key = cls->getClassKey(); |
| PackageList::iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return; |
| |
| ObjectClassList& list = pIter->second.first; |
| if (list.find(key) == list.end()) { |
| list[key] = cls; |
| eventNewClass(key); |
| } |
| } |
| |
| void ConsoleImpl::learnClass(SchemaEventClass* cls) |
| { |
| Mutex::ScopedLock _lock(lock); |
| const SchemaClassKey* key = cls->getClassKey(); |
| PackageList::iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return; |
| |
| EventClassList& list = pIter->second.second; |
| if (list.find(key) == list.end()) { |
| list[key] = cls; |
| eventNewClass(key); |
| } |
| } |
| |
| bool ConsoleImpl::haveClass(const SchemaClassKey* key) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return false; |
| |
| const ObjectClassList& oList = pIter->second.first; |
| const EventClassList& eList = pIter->second.second; |
| |
| return oList.find(key) != oList.end() || eList.find(key) != eList.end(); |
| } |
| |
| SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| PackageList::const_iterator pIter = packages.find(key->getPackageName()); |
| if (pIter == packages.end()) |
| return 0; |
| |
| const ObjectClassList& oList = pIter->second.first; |
| ObjectClassList::const_iterator iter = oList.find(key); |
| if (iter == oList.end()) |
| return 0; |
| |
| return iter->second; |
| } |
| |
| void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED)); |
| event->agent = agent; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED)); |
| event->agent = agent; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| void ConsoleImpl::eventNewPackage(const string& packageName) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE)); |
| event->name = packageName; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| void ConsoleImpl::eventNewClass(const SchemaClassKey* key) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS)); |
| event->classKey = key; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE)); |
| event->object = object; |
| event->hasProps = prop; |
| event->hasStats = stat; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp) |
| { |
| ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT)); |
| event->agent = agent; |
| event->timestamp = timestamp; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(event); |
| } |
| |
| |
| void ConsoleImpl::eventEventReceived(EventPtr event) |
| { |
| ConsoleEventImpl::Ptr console_event(new ConsoleEventImpl(ConsoleEvent::EVENT_RECEIVED)); |
| console_event->event = event; |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.push_back(console_event); |
| } |
| |
| //================================================================== |
| // Wrappers |
| //================================================================== |
| |
| Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {} |
| Console::~Console() { delete impl; } |
| bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } |
| void Console::popEvent() { impl->popEvent(); } |
| void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } |
| void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } |
| uint32_t Console::packageCount() const { return impl->packageCount(); } |
| const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } |
| uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); } |
| const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } |
| ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } |
| const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } |
| const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } |
| void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); } |
| void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } |
| void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } |
| |
| void Console::bindEvent(const SchemaClassKey *key) { impl->bindEvent(key); } |
| void Console::bindEvent(const char* packageName, const char* eventName) { impl->bindEvent(packageName, eventName); } |
| |
| //void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } |
| //void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); } |
| //void Console::endSync(SyncQuery& sync) { impl->endSync(sync); } |
| |
| |