| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "qmf/engine/Agent.h" |
| #include "qmf/engine/SchemaImpl.h" |
| #include "qmf/engine/DataImpl.h" |
| #include "qmf/engine/QueryImpl.h" |
| #include "qmf/Protocol.h" |
| #include <qpid/sys/Mutex.h> |
| #include <qpid/sys/Condition.h> |
| #include <qpid/log/Statement.h> |
| #include <qpid/sys/Time.h> |
| #include <qpid/sys/Thread.h> |
| #include <qpid/sys/Runnable.h> |
| #include <qpid/messaging/Session.h> |
| #include <qpid/messaging/Receiver.h> |
| #include <qpid/messaging/Sender.h> |
| #include <qpid/messaging/Address.h> |
| #include <qpid/messaging/Message.h> |
| #include <qpid/messaging/MapContent.h> |
| #include <qpid/messaging/ListContent.h> |
| #include <qpid/messaging/MapView.h> |
| #include <qpid/messaging/ListView.h> |
| #include <string> |
| #include <deque> |
| #include <map> |
| #include <iostream> |
| #include <fstream> |
| #include <boost/shared_ptr.hpp> |
| #include <boost/noncopyable.hpp> |
| |
| using namespace std; |
| using namespace qmf::engine; |
| using namespace qpid::sys; |
| using namespace qpid::messaging; |
| |
| namespace qmf { |
| namespace engine { |
| |
| class AgentImpl; |
| |
| struct AgentEventImpl { |
| typedef boost::shared_ptr<AgentEventImpl> Ptr; |
| AgentEvent::EventKind kind; |
| uint32_t sequence; |
| string authUserId; |
| string authToken; |
| string name; |
| Data* object; |
| string objectKey; |
| boost::shared_ptr<Query> query; |
| boost::shared_ptr<Variant::Map> arguments; |
| const SchemaClass* objectClass; |
| |
| AgentEventImpl(AgentEvent::EventKind k) : |
| kind(k), sequence(0), object(0), objectClass(0) {} |
| ~AgentEventImpl() {} |
| AgentEvent copy(); |
| }; |
| |
| /** |
| * AsyncContext is used to track asynchronous requests (Query, Sync, or Method) |
| * sent up to the application. |
| */ |
| struct AsyncContext { |
| typedef boost::shared_ptr<AsyncContext> Ptr; |
| string correlationId; |
| Address replyTo; |
| AgentEventImpl::Ptr authorizedEvent; |
| const SchemaMethod* schemaMethod; |
| AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {} |
| }; |
| |
| /** |
| * StoreThread is used only when the Agent runs in internal-store mode. |
| * This class keeps track of stored objects and can perform queries and |
| * subscription queries on the data. |
| */ |
| class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager { |
| public: |
| StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {} |
| ~StoreThread() { stop(); } |
| |
| void addObject(const Data& data); |
| |
| // Methods from Runnable |
| void run(); |
| void stop(); |
| |
| // Methods from DataManager |
| void modifyStart(DataPtr data); |
| void modifyDone(DataPtr data); |
| void destroy(DataPtr data); |
| |
| private: |
| AgentImpl& agent; |
| bool running; |
| qpid::sys::Thread thread; |
| }; |
| |
| class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable { |
| public: |
| AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore); |
| ~AgentImpl(); |
| |
| void setNotifyCallback(Agent::notifyCb handler); |
| void setNotifyCallback(Notifiable* handler); |
| void setAttr(const char* key, const Variant& value); |
| void setStoreDir(const char* path); |
| void setTransferDir(const char* path); |
| bool getEvent(AgentEvent& event) const; |
| void popEvent(); |
| void setConnection(Connection& conn); |
| void authAllow(uint32_t sequence); |
| void authDeny(uint32_t sequence, const Data&); |
| void authDeny(uint32_t sequence, const string&); |
| void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments); |
| void queryResponse(uint32_t sequence, Data& object); |
| void queryComplete(uint32_t sequence); |
| void registerClass(SchemaClass* cls); |
| const char* addObject(Data& obj, const char* key); |
| void raiseEvent(Data& event); |
| |
| void run(); |
| void stop(); |
| |
| // This blocking call is used by the internal store thread(s) to get work to do. |
| AgentEventImpl::Ptr nextInternalEvent(); |
| void signalInternal() { cond.notify(); } |
| |
| private: |
| mutable Mutex lock; |
| Condition cond; |
| const string vendor; |
| const string product; |
| const string name; |
| const string domain; |
| string directAddr; |
| string directAddrParams; |
| string topicAddr; |
| string topicAddrParams; |
| string eventSendAddr; |
| Variant::Map attrMap; |
| string storeDir; |
| string transferDir; |
| bool internalStore; |
| Agent::notifyCb notifyHandler; |
| Notifiable* notifiable; |
| Uuid systemId; |
| uint16_t bootSequence; |
| uint32_t nextContextNum; |
| bool running; |
| deque<AgentEventImpl::Ptr> eventQueue; |
| deque<AgentEventImpl::Ptr> internalEventQueue; |
| map<uint32_t, AsyncContext::Ptr> contextMap; |
| Connection connection; |
| Session session; |
| Receiver directReceiver; |
| Receiver topicReceiver; |
| Sender sender; |
| qpid::sys::Thread* thread; |
| StoreThread* storeThread; |
| |
| struct AgentClassKey { |
| string name; |
| uint8_t hash[16]; |
| AgentClassKey(const string& n, const uint8_t* h) : name(n) { |
| memcpy(hash, h, 16); |
| } |
| string repr() { |
| return name; |
| } |
| }; |
| |
| struct AgentClassKeyComp { |
| bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const |
| { |
| if (lhs.name != rhs.name) |
| return lhs.name < rhs.name; |
| else |
| for (int i = 0; i < 16; i++) |
| if (lhs.hash[i] != rhs.hash[i]) |
| return lhs.hash[i] < rhs.hash[i]; |
| return false; |
| } |
| }; |
| |
| typedef map<AgentClassKey, SchemaClass*, AgentClassKeyComp> ClassMap; |
| map<string, ClassMap> packages; |
| |
| AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, |
| const string& key); |
| AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, |
| const string& key, boost::shared_ptr<Variant::Map> argMap, |
| const SchemaClass* cls); |
| void notify(); |
| void handleRcvMessageLH(const Message& message); |
| void handleAgentLocateLH(const Message& message); |
| void handleQueryRequestLH(const Message& message); |
| void handleSubscribeRequest(const Message& message); |
| void handleSubscribeCancel(const Message& message); |
| void handleSubscribeRefresh(const Message& message); |
| void handleMethodRequest(const Message& message); |
| void sendResponse(const Message& request, const string& opcode, const Data& data); |
| void send(const Address& address, const string& correlationId, const string& opcode, |
| const string& cType, const Data& data); |
| void send(const Address& address, const string& correlationId, const string& opcode, |
| const string& cType, const Variant::List& list, bool partial=false); |
| |
| void sendPackageIndicationLH(const string& packageName); |
| void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); |
| void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, |
| uint32_t code = 0, const string& text = "OK"); |
| void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); |
| void handlePackageRequest(Message& msg); |
| void handleClassQuery(Message& msg); |
| void handleSchemaRequest(Message& msg, uint32_t sequence, |
| const string& replyToExchange, const string& replyToKey); |
| void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId); |
| void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId); |
| }; |
| } |
| } |
| |
| #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} |
| |
| AgentEvent AgentEventImpl::copy() |
| { |
| AgentEvent item; |
| |
| ::memset(&item, 0, sizeof(AgentEvent)); |
| item.kind = kind; |
| item.sequence = sequence; |
| item.object = object; |
| item.query = query.get(); |
| item.arguments = arguments.get(); |
| item.objectClass = objectClass; |
| |
| STRING_REF(objectKey); |
| STRING_REF(authUserId); |
| STRING_REF(authToken); |
| STRING_REF(name); |
| |
| return item; |
| } |
| |
| void StoreThread::addObject(const Data& data) |
| { |
| DataPtr stored(new Data(data)); |
| } |
| |
| void StoreThread::run() |
| { |
| while (running) { |
| AgentEventImpl::Ptr ptr(agent.nextInternalEvent()); |
| } |
| } |
| |
| void StoreThread::stop() |
| { |
| running = false; |
| agent.signalInternal(); |
| } |
| |
| void StoreThread::modifyStart(DataPtr) |
| { |
| // Algorithm: |
| // Make a copy of the indicated object as a delta base if there |
| // isn't already one in place. If there is, do nothing. |
| } |
| |
| void StoreThread::modifyDone(DataPtr) |
| { |
| // Algorithm: |
| // If any deltas between the current and the stored base are discrete, |
| // send an immediate update. Otherwise, mark the object as modified. |
| // |
| // If an update is sent, delete the base copy. If not, leave the base copy |
| // in place for the later periodic update. |
| } |
| |
| void StoreThread::destroy(DataPtr) |
| { |
| // Algorithm: |
| // Send an immediate full-update for this object with the delete time set. |
| // Remove the object and any copies from the data store. |
| } |
| |
| AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : |
| vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), |
| notifyHandler(0), notifiable(0), |
| bootSequence(1), nextContextNum(1), running(true), thread(0) |
| { |
| directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; |
| topicAddr = "qmf." + domain + ".topic/console.ind.#"; |
| eventSendAddr = "qmf." + domain + ".topic/agent.event"; |
| if (_d != 0) { |
| directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}"; |
| topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}"; |
| } |
| attrMap["_vendor"] = vendor; |
| attrMap["_product"] = product; |
| attrMap["_instance"] = name; |
| attrMap["_name"] = vendor + ":" + product + ":" + name; |
| |
| if (internalStore) { |
| storeThread = new StoreThread(*this); |
| } |
| } |
| |
| |
| AgentImpl::~AgentImpl() |
| { |
| } |
| |
| void AgentImpl::setNotifyCallback(Agent::notifyCb handler) |
| { |
| Mutex::ScopedLock _lock(lock); |
| notifyHandler = handler; |
| } |
| |
| void AgentImpl::setNotifyCallback(Notifiable* handler) |
| { |
| Mutex::ScopedLock _lock(lock); |
| notifiable = handler; |
| } |
| |
| void AgentImpl::setAttr(const char* key, const Variant& value) |
| { |
| Mutex::ScopedLock _lock(lock); |
| attrMap[key] = value; |
| } |
| |
| void AgentImpl::setStoreDir(const char* path) |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (path) |
| storeDir = path; |
| else |
| storeDir.clear(); |
| } |
| |
| void AgentImpl::setTransferDir(const char* path) |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (path) |
| transferDir = path; |
| else |
| transferDir.clear(); |
| } |
| |
| |
| bool AgentImpl::getEvent(AgentEvent& event) const |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (eventQueue.empty()) |
| return false; |
| event = eventQueue.front()->copy(); |
| return true; |
| } |
| |
| void AgentImpl::popEvent() |
| { |
| Mutex::ScopedLock _lock(lock); |
| if (!eventQueue.empty()) |
| eventQueue.pop_front(); |
| } |
| |
| void AgentImpl::setConnection(Connection& conn) |
| { |
| Mutex::ScopedLock _lock(lock); |
| |
| // |
| // Don't permit the overwriting of an existing connection |
| // TODO: return an error or throw an exception if an overwrite is attempted. |
| // |
| if (connection == 0) |
| return; |
| connection = conn; |
| |
| // |
| // Start the Agent thread now that we have a connection to work with. |
| // |
| thread = new qpid::sys::Thread(*this); |
| } |
| |
| void AgentImpl::authAllow(uint32_t sequence) |
| { |
| Mutex::ScopedLock _lock(lock); |
| |
| // Find the context associated with the sequence number |
| map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| AsyncContext::Ptr context = iter->second; |
| |
| // Transform the authorize event into the real event |
| switch (context->authorizedEvent->kind) { |
| case AgentEvent::GET_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break; |
| case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break; |
| case AgentEvent::SYNC_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::START_SYNC; break; |
| default: |
| contextMap.erase(iter); |
| return; |
| } |
| |
| // Re-issue the now-authorized action. If this is a data query (get or subscribe), |
| // and the agent is handling storage internally, redirect to the internal event |
| // queue for processing by the internal-storage thread. |
| if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) { |
| internalEventQueue.push_back(context->authorizedEvent); |
| cond.notify(); |
| } else { |
| eventQueue.push_back(context->authorizedEvent); |
| notify(); |
| } |
| } |
| |
| void AgentImpl::authDeny(uint32_t sequence, const Data& exception) |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| AsyncContext::Ptr context = iter->second; |
| contextMap.erase(iter); |
| |
| // Return an exception message to the requestor |
| send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception); |
| } |
| |
| void AgentImpl::authDeny(uint32_t sequence, const string& error) |
| { |
| Data exception; |
| exception.getValues()["status"] = "Access to this Operation Denied"; |
| exception.getValues()["text"] = error; |
| authDeny(sequence, exception); |
| } |
| |
| void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/) |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| AsyncContext::Ptr context = iter->second; |
| contextMap.erase(iter); |
| |
| // TODO: Encode method response |
| QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text); |
| } |
| |
| void AgentImpl::queryResponse(uint32_t sequence, Data& data) |
| { |
| AsyncContext::Ptr context; |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| context = iter->second; |
| } |
| |
| Variant::List list; |
| list.push_back(data.impl->asMap()); |
| send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true); |
| QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo); |
| } |
| |
| void AgentImpl::queryComplete(uint32_t sequence) |
| { |
| AsyncContext::Ptr context; |
| { |
| Mutex::ScopedLock _lock(lock); |
| map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); |
| if (iter == contextMap.end()) |
| return; |
| context = iter->second; |
| contextMap.erase(iter); |
| } |
| |
| send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List()); |
| QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message"); |
| } |
| |
| void AgentImpl::registerClass(SchemaClass* cls) |
| { |
| Mutex::ScopedLock _lock(lock); |
| |
| map<string, ClassMap>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); |
| if (iter == packages.end()) { |
| packages[cls->getClassKey()->getPackageName()] = ClassMap(); |
| iter = packages.find(cls->getClassKey()->getPackageName()); |
| // TODO: Indicate this package if connected |
| } |
| |
| AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHashData()); |
| iter->second[key] = cls; |
| |
| // TODO: Indicate this schema if connected. |
| } |
| |
| const char* AgentImpl::addObject(Data&, const char*) |
| { |
| // TODO: Implement |
| // |
| // Determine a key for this object: |
| // if supplied, use the supplied key |
| // else: |
| // if the data is described (has a schema), use the schema primary-key to generate a key |
| // else make something up (a guid) |
| // |
| |
| Mutex::ScopedLock _lock(lock); |
| return 0; |
| } |
| |
| void AgentImpl::raiseEvent(Data& data) |
| { |
| Variant::List list; |
| list.push_back(data.impl->asMap()); |
| send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list); |
| } |
| |
| void AgentImpl::run() |
| { |
| qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500; |
| |
| session = connection.newSession(); |
| QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams); |
| directReceiver = session.createReceiver(directAddr + directAddrParams); |
| directReceiver.setCapacity(10); |
| |
| QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams); |
| topicReceiver = session.createReceiver(topicAddr + topicAddrParams); |
| topicReceiver.setCapacity(10); |
| |
| Mutex::ScopedLock _lock(lock); |
| while (running) { |
| Receiver rcvr; |
| bool available; |
| { |
| Mutex::ScopedUnlock _unlock(lock); |
| available = session.nextReceiver(rcvr, duration); |
| } |
| |
| if (available) { |
| Message msg(rcvr.get()); |
| handleRcvMessageLH(msg); |
| } |
| } |
| |
| directReceiver.close(); |
| session.close(); |
| } |
| |
| void AgentImpl::stop() |
| { |
| Mutex::ScopedLock _lock(lock); |
| running = false; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::nextInternalEvent() |
| { |
| Mutex::ScopedLock _lock(lock); |
| while (internalEventQueue.empty()) |
| cond.wait(lock); |
| |
| AgentEventImpl::Ptr event(internalEventQueue.front()); |
| internalEventQueue.pop_front(); |
| return event; |
| |
| // TODO: make sure this function returns with a null pointer when the thread needs to stop. |
| } |
| |
| |
| void AgentImpl::handleRcvMessageLH(const Message& message) |
| { |
| Variant::Map headers(message.getHeaders()); |
| cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() << |
| " replyTo=" << message.getReplyTo() << |
| " headers=" << headers << endl; |
| |
| if (message.getContentType() != Protocol::AMQP_CONTENT_MAP && |
| message.getContentType() != Protocol::AMQP_CONTENT_LIST) |
| return; |
| |
| Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE); |
| if (iter == headers.end()) |
| return; |
| string opcode = iter->second.asString(); |
| |
| if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message); |
| if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message); |
| if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message); |
| if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message); |
| if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message); |
| if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message); |
| } |
| |
| void AgentImpl::handleAgentLocateLH(const Message& message) |
| { |
| QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo()); |
| auto_ptr<Query> query(QueryImpl::factory(ListView(message))); |
| if (query->matches(attrMap)) { |
| Data data(0, attrMap); |
| sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data); |
| QPID_LOG(trace, "SENT AgentLocateResponse"); |
| } |
| } |
| |
| void AgentImpl::handleQueryRequestLH(const Message& message) |
| { |
| uint32_t contextNum = nextContextNum++; |
| AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo())); |
| contextMap[contextNum] = context; |
| |
| // Build the event for the get request |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE)); |
| event->sequence = contextNum; |
| event->authUserId = message.getUserId(); |
| event->query.reset(QueryImpl::factory(MapView(message))); |
| |
| // Put the not-yet-authorized event into the context for possible later use |
| context->authorizedEvent = event; |
| |
| // Enqueue the event |
| eventQueue.push_back(event); |
| notify(); |
| } |
| |
| void AgentImpl::handleSubscribeRequest(const Message& message) |
| { |
| const MapView map(message); |
| } |
| |
| void AgentImpl::handleSubscribeCancel(const Message& message) |
| { |
| const MapView map(message); |
| } |
| |
| void AgentImpl::handleSubscribeRefresh(const Message& message) |
| { |
| const MapView map(message); |
| } |
| |
| void AgentImpl::handleMethodRequest(const Message& message) |
| { |
| const MapView map(message); |
| } |
| |
| void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data) |
| { |
| send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data); |
| } |
| |
| void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data) |
| { |
| Message message; |
| MapContent content(message, data.impl->asMap()); |
| |
| if (!correlationId.empty()) |
| message.setCorrelationId(correlationId); |
| if (!cType.empty()) |
| message.getHeaders()[Protocol::APP_CONTENT] = cType; |
| message.getHeaders()[Protocol::APP_OPCODE] = opcode; |
| content.encode(); |
| session.createSender(address).send(message); |
| } |
| |
| void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial) |
| { |
| Message message; |
| ListContent content(message, list); |
| |
| if (!correlationId.empty()) |
| message.setCorrelationId(correlationId); |
| if (!cType.empty()) |
| message.getHeaders()[Protocol::APP_CONTENT] = cType; |
| message.getHeaders()[Protocol::APP_OPCODE] = opcode; |
| if (partial) |
| message.getHeaders()[Protocol::APP_PARTIAL] = Variant(); |
| content.encode(); |
| session.createSender(address).send(message); |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); |
| event->sequence = num; |
| event->authUserId = userId; |
| event->objectKey = key; |
| return event; |
| } |
| |
| AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, |
| const string& key, boost::shared_ptr<Variant::Map> argMap, |
| const SchemaClass* cls) |
| { |
| AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); |
| event->sequence = num; |
| event->authUserId = userId; |
| event->name = method; |
| event->objectKey = key; |
| event->arguments = argMap; |
| event->objectClass = cls; |
| return event; |
| } |
| |
| void AgentImpl::notify() |
| { |
| if (notifyHandler != 0) |
| notifyHandler(); |
| if (notifiable != 0) |
| notifiable->notify(); |
| } |
| |
| void AgentImpl::sendPackageIndicationLH(const string& packageName) |
| { |
| // TODO |
| QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); |
| } |
| |
| void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key) |
| { |
| // TODO |
| QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); |
| } |
| |
| void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text) |
| { |
| // TODO |
| QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); |
| } |
| |
| void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text) |
| { |
| // TODO |
| QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text); |
| } |
| |
| void AgentImpl::handlePackageRequest(Message&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleClassQuery(Message&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&) |
| { |
| Mutex::ScopedLock _lock(lock); |
| } |
| |
| void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/) |
| { |
| } |
| |
| //================================================================== |
| // Wrappers |
| //================================================================== |
| |
| Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); } |
| Agent::~Agent() { delete impl; } |
| void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); } |
| void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); } |
| void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); } |
| void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } |
| void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } |
| bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } |
| void Agent::popEvent() { impl->popEvent(); } |
| void Agent::setConnection(Connection& conn) { impl->setConnection(conn); } |
| void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); } |
| void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); } |
| void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); } |
| void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); } |
| void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); } |
| void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } |
| void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); } |
| const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); } |
| void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); } |
| |