| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "qmf/engine/Agent.h" |
| #include "qmf/engine/MessageImpl.h" |
| #include "qmf/engine/SchemaImpl.h" |
| #include "qmf/engine/Typecode.h" |
| #include "qmf/engine/EventImpl.h" |
| #include "qmf/engine/ObjectImpl.h" |
| #include "qmf/engine/ObjectIdImpl.h" |
| #include "qmf/engine/QueryImpl.h" |
| #include "qmf/engine/ValueImpl.h" |
| #include "qmf/engine/Protocol.h" |
| #include <qpid/framing/Buffer.h> |
| #include <qpid/framing/Uuid.h> |
| #include <qpid/framing/FieldTable.h> |
| #include <qpid/framing/FieldValue.h> |
| #include <qpid/sys/Mutex.h> |
| #include <qpid/log/Statement.h> |
| #include <qpid/sys/Time.h> |
| #include <string.h> |
| #include <string> |
| #include <deque> |
| #include <map> |
| #include <iostream> |
| #include <fstream> |
| #include <boost/shared_ptr.hpp> |
| #include <boost/noncopyable.hpp> |
| |
| using namespace std; |
| using namespace qmf::engine; |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| |
| namespace qmf { |
| namespace engine { |
| |
| struct AgentEventImpl { |
| typedef boost::shared_ptr<AgentEventImpl> Ptr; |
| AgentEvent::EventKind kind; |
| uint32_t sequence; |
| string authUserId; |
| string authToken; |
| string name; |
| Object* object; |
| boost::shared_ptr<ObjectId> objectId; |
| boost::shared_ptr<Query> query; |
| boost::shared_ptr<Value> arguments; |
| string exchange; |
| string bindingKey; |
| const SchemaObjectClass* objectClass; |
| |
| AgentEventImpl(AgentEvent::EventKind k) : |
| kind(k), sequence(0), object(0), objectClass(0) {} |
| ~AgentEventImpl() {} |
| AgentEvent copy(); |
| }; |
| |
| struct AgentQueryContext { |
| typedef boost::shared_ptr<AgentQueryContext> Ptr; |
| uint32_t sequence; |
| string exchange; |
| string key; |
| const SchemaMethod* schemaMethod; |
| AgentQueryContext() : schemaMethod(0) {} |
| }; |
| |
| class AgentImpl : public boost::noncopyable { |
| public: |
| AgentImpl(char* label, bool internalStore); |
| ~AgentImpl(); |
| |
| void setStoreDir(const char* path); |
| void setTransferDir(const char* path); |
| void handleRcvMessage(Message& message); |
| bool getXmtMessage(Message& item) const; |
| void popXmt(); |
| bool getEvent(AgentEvent& event) const; |
| void popEvent(); |
| void newSession(); |
| void startProtocol(); |
| void heartbeat(); |
| void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); |
| void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); |
| void queryComplete(uint32_t sequence); |
| void registerClass(SchemaObjectClass* cls); |
| void registerClass(SchemaEventClass* cls); |
| const ObjectId* addObject(Object& obj, uint64_t persistId); |
| const ObjectId* allocObjectId(uint64_t persistId); |
| const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); |
| void raiseEvent(Event& event); |
| |
| private: |
| mutable Mutex lock; |
| Mutex addLock; |
| string label; |
| string queueName; |
| string storeDir; |
| string transferDir; |
| bool internalStore; |
| uint64_t nextTransientId; |
| Uuid systemId; |
| uint32_t requestedBrokerBank; |
| uint32_t requestedAgentBank; |
| uint32_t assignedBrokerBank; |
| uint32_t assignedAgentBank; |
| AgentAttachment attachment; |
| uint16_t bootSequence; |
| uint64_t nextObjectId; |
| uint32_t nextContextNum; |
| deque<AgentEventImpl::Ptr> eventQueue; |
| deque<MessageImpl::Ptr> xmtQueue; |
| map<uint32_t, AgentQueryContext::Ptr> contextMap; |
| bool attachComplete; |
| |
| static const char* QMF_EXCHANGE; |
| static const char* DIR_EXCHANGE; |
| static const char* BROKER_KEY; |
| static const uint32_t MERR_UNKNOWN_METHOD = 2; |
| static const uint32_t MERR_UNKNOWN_PACKAGE = 8; |
| static const uint32_t MERR_UNKNOWN_CLASS = 9; |
| static const uint32_t MERR_INTERNAL_ERROR = 10; |
| # define MA_BUFFER_SIZE 65536 |
| char outputBuffer[MA_BUFFER_SIZE]; |
| |
| struct AgentClassKey { |
| string name; |
| uint8_t hash[16]; |
| AgentClassKey(const string& n, const uint8_t* h) : name(n) { |
| memcpy(hash, h, 16); |
| } |
| AgentClassKey(Buffer& buffer) { |
| buffer.getShortString(name); |
| buffer.getBin128(hash); |
| } |
| string repr() { |
| return name; |
| } |
| }; |
| |
| struct AgentClassKeyComp { |
| bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const |
| { |
| if (lhs.name != rhs.name) |
| return lhs.name < rhs.name; |
| else |
| for (int i = 0; i < 16; i++) |
| if (lhs.hash[i] != rhs.hash[i]) |
| return lhs.hash[i] < rhs.hash[i]; |
| return false; |
| } |
| }; |
| |
| typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap; |
| typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap; |
| |
| struct ClassMaps { |
| ObjectClassMap objectClasses; |
| EventClassMap eventClasses; |
| }; |
| |
| map<string, ClassMaps> packages; |
| |
| AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); |
| AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); |
| AgentEventImpl::Ptr eventSetupComplete(); |
| AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, |
| boost::shared_ptr<ObjectId> oid); |
| AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, |
| boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, |
| const SchemaObjectClass* objectClass); |
| void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); |
| |
| void sendPackageIndicationLH(const string& packageName); |
| void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); |
| void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, |
| uint32_t code = 0, const string& text = "OK"); |
| void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); |
| void handleAttachResponse(Buffer& inBuffer); |
| void handlePackageRequest(Buffer& inBuffer); |
| void handleClassQuery(Buffer& inBuffer); |
| void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, |
| const string& replyToExchange, const string& replyToKey); |
| void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); |
| void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); |
| void handleConsoleAddedIndication(); |
| }; |
| } |
| } |
| |
| const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; |
| const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; |
| const char* AgentImpl::BROKER_KEY = "broker"; |
| |
| #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} |
| |
| AgentEvent AgentEventImpl::copy() |
| { |
| AgentEvent item; |
| |
| ::memset(&item, 0, sizeof(AgentEvent)); |
| item.kind = kind; |
| item.sequence = sequence; |
| item.object = object; |
| item.objectId = objectId.get(); |
| item.query = query.get(); |
| item.arguments = arguments.get(); |
| item.objectClass = objectClass; |
| |
| STRING_REF(authUserId); |
| STRING_REF(authToken); |
| STRING_REF(name); |
| STRING_REF(exchange); |
| STRING_REF(bindingKey); |
| |
| return item; |
| } |
| |
| AgentImpl::AgentImpl(char* _label, bool i) : |
| label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), |
| requestedBrokerBank(0), requestedAgentBank(0), |
| assignedBrokerBank(0), assignedAgentBank(0), |
| bootSequence(1), nextObjectId(1), nextContextNum(1), attachComplete(false) |
| { |
| queueName += Uuid(true).str(); |
| } |
| |
| AgentImpl::~AgentImpl() |
| { |
| } |
| |
| void AgentImpl::setStoreDir(const char* path) |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (path) |
| storeDir = path; |
| else |
| storeDir.clear(); |
| } |
| |
| void AgentImpl::setTransferDir(const char* path) |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (path) |
| transferDir = path; |
| else |
| transferDir.clear(); |
| } |
| |
| void AgentImpl::handleRcvMessage(Message& message) |
| { |
| Buffer inBuffer(message.body, message.length); |
| uint8_t opcode; |
| uint32_t sequence; |
| string replyToExchange(message.replyExchange ? message.replyExchange : ""); |
| string replyToKey(message.replyKey ? message.replyKey : ""); |
| string userId(message.userId ? message.userId : ""); |
| |
| while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { |
| if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); |
| else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); |
| else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); |
| else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); |
| else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); |
| else { |
| QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); |
| break; |
| } |
| } |
| } |
| |
| bool AgentImpl::getXmtMessage(Message& item) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (xmtQueue.empty()) |
| return false; |
| item = xmtQueue.front()->copy(); |
| return true; |
| } |
| |
| void AgentImpl::popXmt() |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (!xmtQueue.empty()) |
| xmtQueue.pop_front(); |
| } |
| |
| bool AgentImpl::getEvent(AgentEvent& event) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (eventQueue.empty()) |
| return false; |
| event = eventQueue.front()->copy(); |
| return true; |
| } |
| |
| void AgentImpl::popEvent() |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (!eventQueue.empty()) |
| eventQueue.pop_front(); |
| } |
| |
| void AgentImpl::newSession() |
| { |
| Mutex::ScopedLock _lock(lock); |
| eventQueue.clear(); |
| xmtQueue.clear(); |
| eventQueue.push_back(eventDeclareQueue(queueName)); |
| eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); |
| eventQueue.push_back(eventSetupComplete()); |
| } |
| |
| void AgentImpl::startProtocol() |
| { |
| Mutex::ScopedLock _lock(lock); |
| char rawbuffer[512]; |
| Buffer buffer(rawbuffer, 512); |
| |
| Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); |
| buffer.putShortString(label); |
| systemId.encode(buffer); |
| buffer.putLong(requestedBrokerBank); |
| buffer.putLong(requestedAgentBank); |
| sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); |
| QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << |
| " reqAgent=" << requestedAgentBank); |
| } |
| |
| void AgentImpl::heartbeat() |
| { |
| Mutex::ScopedLock _lock(lock); |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| |
| Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); |
| buffer.putLongLong(uint64_t(Duration(EPOCH, now()))); |
| stringstream key; |
| key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; |
| sendBufferLH(buffer, QMF_EXCHANGE, key.str()); |
| QPID_LOG(trace, "SENT HeartbeatIndication"); |
| } |
| |
| void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& argMap) |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| AgentQueryContext::Ptr context = iter->second; |
| contextMap.erase(iter); |
| |
| char* buf(outputBuffer); |
| uint32_t bufLen(114 + strlen(text)); // header(8) + status(4) + mstring(2 + size) + margin(100) |
| bool allocated(false); |
| |
| if (status == 0) { |
| for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); |
| aIter != context->schemaMethod->impl->arguments.end(); aIter++) { |
| const SchemaArgument* schemaArg = *aIter; |
| if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { |
| if (argMap.keyInMap(schemaArg->getName())) { |
| const Value* val = argMap.byKey(schemaArg->getName()); |
| bufLen += val->impl->encodedSize(); |
| } else { |
| Value val(schemaArg->getType()); |
| bufLen += val.impl->encodedSize(); |
| } |
| } |
| } |
| } |
| |
| if (bufLen > MA_BUFFER_SIZE) { |
| buf = (char*) malloc(bufLen); |
| allocated = true; |
| } |
| |
| Buffer buffer(buf, bufLen); |
| Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); |
| buffer.putLong(status); |
| buffer.putMediumString(text); |
| if (status == 0) { |
| for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); |
| aIter != context->schemaMethod->impl->arguments.end(); aIter++) { |
| const SchemaArgument* schemaArg = *aIter; |
| if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { |
| if (argMap.keyInMap(schemaArg->getName())) { |
| const Value* val = argMap.byKey(schemaArg->getName()); |
| val->impl->encode(buffer); |
| } else { |
| Value val(schemaArg->getType()); |
| val.impl->encode(buffer); |
| } |
| } |
| } |
| } |
| sendBufferLH(buffer, context->exchange, context->key); |
| if (allocated) |
| free(buf); |
| QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); |
| } |
| |
| void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| AgentQueryContext::Ptr context = iter->second; |
| |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); |
| |
| object.impl->encodeSchemaKey(buffer); |
| object.impl->encodeManagedObjectData(buffer); |
| if (prop) |
| object.impl->encodeProperties(buffer); |
| if (stat) |
| object.impl->encodeStatistics(buffer); |
| |
| sendBufferLH(buffer, context->exchange, context->key); |
| QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); |
| } |
| |
| void AgentImpl::queryComplete(uint32_t sequence) |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| |
| AgentQueryContext::Ptr context = iter->second; |
| contextMap.erase(iter); |
| sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); |
| } |
| |
| void AgentImpl::registerClass(SchemaObjectClass* cls) |
| { |
| Mutex::ScopedLock _lock(lock); |
| bool newPackage = false; |
| |
| map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); |
| if (iter == packages.end()) { |
| packages[cls->getClassKey()->getPackageName()] = ClassMaps(); |
| iter = packages.find(cls->getClassKey()->getPackageName()); |
| newPackage = true; |
| } |
| |
| AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); |
| iter->second.objectClasses[key] = cls; |
| |
| // Indicate this new schema if connected. |
| |
| if (attachComplete) { |
| |
| if (newPackage) { |
| sendPackageIndicationLH(iter->first); |
| } |
| sendClassIndicationLH(CLASS_OBJECT, iter->first, key); |
| } |
| } |
| |
| void AgentImpl::registerClass(SchemaEventClass* cls) |
| { |
| Mutex::ScopedLock _lock(lock); |
| bool newPackage = false; |
| |
| map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); |
| if (iter == packages.end()) { |
| packages[cls->getClassKey()->getPackageName()] = ClassMaps(); |
| iter = packages.find(cls->getClassKey()->getPackageName()); |
| newPackage = true; |
| } |
| |
| AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); |
| iter->second.eventClasses[key] = cls; |
| |
| // Indicate this new schema if connected. |
| |
| if (attachComplete) { |
| |
| if (newPackage) { |
| sendPackageIndicationLH(iter->first); |
| } |
| sendClassIndicationLH(CLASS_EVENT, iter->first, key); |
| } |
| } |
| |
| const ObjectId* AgentImpl::addObject(Object&, uint64_t) |
| { |
| Mutex::ScopedLock _lock(lock); |
| return 0; |
| } |
| |
| const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) |
| { |
| Mutex::ScopedLock _lock(lock); |
| uint16_t sequence = persistId ? 0 : bootSequence; |
| uint64_t objectNum = persistId ? persistId : nextObjectId++; |
| |
| ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum); |
| return oid; |
| } |
| |
| const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) |
| { |
| return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); |
| } |
| |
| void AgentImpl::raiseEvent(Event& event) |
| { |
| Mutex::ScopedLock _lock(lock); |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION); |
| |
| event.impl->encodeSchemaKey(buffer); |
| buffer.putLongLong(uint64_t(Duration(EPOCH, now()))); |
| event.impl->encode(buffer); |
| string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank)); |
| |
| sendBufferLH(buffer, QMF_EXCHANGE, key); |
| QPID_LOG(trace, "SENT EventIndication"); |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); |
| event->name = name; |
| |
| return event; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, |
| const string& key) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); |
| event->name = queue; |
| event->exchange = exchange; |
| event->bindingKey = key; |
| |
| return event; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventSetupComplete() |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); |
| return event; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, |
| const string& cls, boost::shared_ptr<ObjectId> oid) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); |
| event->sequence = num; |
| event->authUserId = userId; |
| if (oid.get()) |
| event->query.reset(new Query(oid.get())); |
| else |
| event->query.reset(new Query(cls.c_str(), package.c_str())); |
| return event; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, |
| boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, |
| const SchemaObjectClass* objectClass) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); |
| event->sequence = num; |
| event->authUserId = userId; |
| event->name = method; |
| event->objectId = oid; |
| event->arguments = argMap; |
| event->objectClass = objectClass; |
| return event; |
| } |
| |
| void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) |
| { |
| uint32_t length = buf.getPosition(); |
| MessageImpl::Ptr message(new MessageImpl); |
| |
| buf.reset(); |
| buf.getRawData(message->body, length); |
| message->destination = destination; |
| message->routingKey = routingKey; |
| message->replyExchange = "amq.direct"; |
| message->replyKey = queueName; |
| |
| xmtQueue.push_back(message); |
| } |
| |
| void AgentImpl::sendPackageIndicationLH(const string& packageName) |
| { |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); |
| buffer.putShortString(packageName); |
| sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); |
| QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); |
| } |
| |
| void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) |
| { |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); |
| buffer.putOctet((int) kind); |
| buffer.putShortString(packageName); |
| buffer.putShortString(key.name); |
| buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries |
| sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); |
| QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); |
| } |
| |
| void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, |
| uint32_t sequence, uint32_t code, const string& text) |
| { |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); |
| buffer.putLong(code); |
| buffer.putShortString(text); |
| sendBufferLH(buffer, exchange, replyToKey); |
| QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); |
| } |
| |
| void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) |
| { |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); |
| buffer.putLong(code); |
| |
| string fulltext; |
| switch (code) { |
| case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; |
| case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; |
| case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; |
| case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; |
| default: fulltext = "Unspecified Error"; break; |
| } |
| |
| if (!text.empty()) { |
| fulltext += " ("; |
| fulltext += text; |
| fulltext += ")"; |
| } |
| |
| buffer.putMediumString(fulltext); |
| sendBufferLH(buffer, DIR_EXCHANGE, key); |
| QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); |
| } |
| |
| void AgentImpl::handleAttachResponse(Buffer& inBuffer) |
| { |
| Mutex::ScopedLock _lock(lock); |
| |
| assignedBrokerBank = inBuffer.getLong(); |
| assignedAgentBank = inBuffer.getLong(); |
| |
| QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); |
| |
| if ((assignedBrokerBank != requestedBrokerBank) || |
| (assignedAgentBank != requestedAgentBank)) { |
| if (requestedAgentBank == 0) { |
| QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << |
| assignedAgentBank); |
| } else { |
| QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << |
| "." << assignedAgentBank); |
| } |
| //storeData(); // TODO |
| requestedBrokerBank = assignedBrokerBank; |
| requestedAgentBank = assignedAgentBank; |
| } |
| |
| attachment.setBanks(assignedBrokerBank, assignedAgentBank); |
| |
| // Bind to qpid.management to receive commands |
| stringstream key; |
| key << "agent." << assignedBrokerBank << "." << assignedAgentBank; |
| eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); |
| |
| // Send package indications for all local packages |
| for (map<string, ClassMaps>::iterator pIter = packages.begin(); |
| pIter != packages.end(); |
| pIter++) { |
| sendPackageIndicationLH(pIter->first); |
| |
| // Send class indications for all local classes |
| ClassMaps cMap = pIter->second; |
| for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); |
| cIter != cMap.objectClasses.end(); cIter++) |
| sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); |
| for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); |
| cIter != cMap.eventClasses.end(); cIter++) |
| sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); |
| } |
| |
| attachComplete = true; |
| } |
| |
| void AgentImpl::handlePackageRequest(Buffer&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleClassQuery(Buffer&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, |
| const string& replyExchange, const string& replyKey) |
| { |
| Mutex::ScopedLock _lock(lock); |
| string rExchange(replyExchange); |
| string rKey(replyKey); |
| string packageName; |
| inBuffer.getShortString(packageName); |
| AgentClassKey key(inBuffer); |
| |
| if (rExchange.empty()) |
| rExchange = QMF_EXCHANGE; |
| if (rKey.empty()) |
| rKey = BROKER_KEY; |
| |
| QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); |
| |
| map<string, ClassMaps>::iterator pIter = packages.find(packageName); |
| if (pIter == packages.end()) { |
| sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); |
| return; |
| } |
| |
| ClassMaps cMap = pIter->second; |
| ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); |
| if (ocIter != cMap.objectClasses.end()) { |
| SchemaObjectClass* oImpl = ocIter->second; |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); |
| oImpl->impl->encode(buffer); |
| sendBufferLH(buffer, rExchange, rKey); |
| QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); |
| return; |
| } |
| |
| EventClassMap::iterator ecIter = cMap.eventClasses.find(key); |
| if (ecIter != cMap.eventClasses.end()) { |
| SchemaEventClass* eImpl = ecIter->second; |
| Buffer buffer(outputBuffer, MA_BUFFER_SIZE); |
| Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); |
| eImpl->impl->encode(buffer); |
| sendBufferLH(buffer, rExchange, rKey); |
| QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); |
| return; |
| } |
| |
| sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); |
| } |
| |
| void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) |
| { |
| Mutex::ScopedLock _lock(lock); |
| FieldTable ft; |
| FieldTable::ValuePtr value; |
| map<string, ClassMaps>::const_iterator pIter = packages.end(); |
| string pname; |
| string cname; |
| string oidRepr; |
| boost::shared_ptr<ObjectId> oid; |
| |
| ft.decode(inBuffer); |
| |
| QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft); |
| |
| value = ft.get("_package"); |
| if (value.get() && value->convertsTo<string>()) { |
| pname = value->get<string>(); |
| pIter = packages.find(pname); |
| if (pIter == packages.end()) { |
| sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); |
| return; |
| } |
| } |
| |
| value = ft.get("_class"); |
| if (value.get() && value->convertsTo<string>()) { |
| cname = value->get<string>(); |
| // TODO - check for validity of class (in package or any package) |
| if (pIter == packages.end()) { |
| } else { |
| |
| } |
| } |
| |
| value = ft.get("_objectid"); |
| if (value.get() && value->convertsTo<string>()) { |
| oidRepr = value->get<string>(); |
| oid.reset(new ObjectId()); |
| oid->impl->fromString(oidRepr); |
| } |
| |
| AgentQueryContext::Ptr context(new AgentQueryContext); |
| uint32_t contextNum = nextContextNum++; |
| context->sequence = sequence; |
| context->exchange = DIR_EXCHANGE; |
| context->key = replyTo; |
| contextMap[contextNum] = context; |
| |
| eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); |
| } |
| |
| void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) |
| { |
| Mutex::ScopedLock _lock(lock); |
| string pname; |
| string method; |
| boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer)); |
| buffer.getShortString(pname); |
| AgentClassKey classKey(buffer); |
| buffer.getShortString(method); |
| |
| QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method); |
| |
| map<string, ClassMaps>::const_iterator pIter = packages.find(pname); |
| if (pIter == packages.end()) { |
| sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); |
| return; |
| } |
| |
| ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); |
| if (cIter == pIter->second.objectClasses.end()) { |
| sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); |
| return; |
| } |
| |
| const SchemaObjectClass* schema = cIter->second; |
| vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin(); |
| for (; mIter != schema->impl->methods.end(); mIter++) { |
| if ((*mIter)->getName() == method) |
| break; |
| } |
| |
| if (mIter == schema->impl->methods.end()) { |
| sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); |
| return; |
| } |
| |
| const SchemaMethod* schemaMethod = *mIter; |
| boost::shared_ptr<Value> argMap(new Value(TYPE_MAP)); |
| Value* value; |
| for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin(); |
| aIter != schemaMethod->impl->arguments.end(); aIter++) { |
| const SchemaArgument* schemaArg = *aIter; |
| if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT) |
| value = ValueImpl::factory(schemaArg->getType(), buffer); |
| else |
| value = ValueImpl::factory(schemaArg->getType()); |
| argMap->insert(schemaArg->getName(), value); |
| } |
| |
| AgentQueryContext::Ptr context(new AgentQueryContext); |
| uint32_t contextNum = nextContextNum++; |
| context->sequence = sequence; |
| context->exchange = DIR_EXCHANGE; |
| context->key = replyTo; |
| context->schemaMethod = schemaMethod; |
| contextMap[contextNum] = context; |
| |
| eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema)); |
| } |
| |
| void AgentImpl::handleConsoleAddedIndication() |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| //================================================================== |
| // Wrappers |
| //================================================================== |
| |
| Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); } |
| Agent::~Agent() { delete impl; } |
| void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } |
| void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } |
| void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } |
| bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } |
| void Agent::popXmt() { impl->popXmt(); } |
| bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } |
| void Agent::popEvent() { impl->popEvent(); } |
| void Agent::newSession() { impl->newSession(); } |
| void Agent::startProtocol() { impl->startProtocol(); } |
| void Agent::heartbeat() { impl->heartbeat(); } |
| void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } |
| void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } |
| void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } |
| void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } |
| void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } |
| const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } |
| const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } |
| const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } |
| void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } |
| |