blob: 453c6bd27bc987f3c30e7b72c5e1179796e4e096 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "qmf/engine/Agent.h"
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/DataImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Condition.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Thread.h>
#include <qpid/sys/Runnable.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/MapContent.h>
#include <qpid/messaging/ListContent.h>
#include <qpid/messaging/MapView.h>
#include <qpid/messaging/ListView.h>
#include <string>
#include <deque>
#include <map>
#include <iostream>
#include <fstream>
#include <boost/shared_ptr.hpp>
#include <boost/noncopyable.hpp>
using namespace std;
using namespace qmf::engine;
using namespace qpid::sys;
using namespace qpid::messaging;
namespace qmf {
namespace engine {
class AgentImpl;
struct AgentEventImpl {
typedef boost::shared_ptr<AgentEventImpl> Ptr;
AgentEvent::EventKind kind;
uint32_t sequence;
string authUserId;
string authToken;
string name;
Data* object;
string objectKey;
boost::shared_ptr<Query> query;
boost::shared_ptr<Variant::Map> arguments;
const SchemaClass* objectClass;
AgentEventImpl(AgentEvent::EventKind k) :
kind(k), sequence(0), object(0), objectClass(0) {}
~AgentEventImpl() {}
AgentEvent copy();
* AsyncContext is used to track asynchronous requests (Query, Sync, or Method)
* sent up to the application.
struct AsyncContext {
typedef boost::shared_ptr<AsyncContext> Ptr;
string correlationId;
Address replyTo;
AgentEventImpl::Ptr authorizedEvent;
const SchemaMethod* schemaMethod;
AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
* StoreThread is used only when the Agent runs in internal-store mode.
* This class keeps track of stored objects and can perform queries and
* subscription queries on the data.
class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager {
StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
~StoreThread() { stop(); }
void addObject(const Data& data);
// Methods from Runnable
void run();
void stop();
// Methods from DataManager
void modifyStart(DataPtr data);
void modifyDone(DataPtr data);
void destroy(DataPtr data);
AgentImpl& agent;
bool running;
qpid::sys::Thread thread;
class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
void setNotifyCallback(Agent::notifyCb handler);
void setNotifyCallback(Notifiable* handler);
void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
bool getEvent(AgentEvent& event) const;
void popEvent();
void setConnection(Connection& conn);
void authAllow(uint32_t sequence);
void authDeny(uint32_t sequence, const Data&);
void authDeny(uint32_t sequence, const string&);
void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
void queryResponse(uint32_t sequence, Data& object);
void queryComplete(uint32_t sequence);
void registerClass(SchemaClass* cls);
const char* addObject(Data& obj, const char* key);
void raiseEvent(Data& event);
void run();
void stop();
// This blocking call is used by the internal store thread(s) to get work to do.
AgentEventImpl::Ptr nextInternalEvent();
void signalInternal() { cond.notify(); }
mutable Mutex lock;
Condition cond;
const string vendor;
const string product;
const string name;
const string domain;
string directAddr;
string directAddrParams;
string topicAddr;
string topicAddrParams;
string eventSendAddr;
Variant::Map attrMap;
string storeDir;
string transferDir;
bool internalStore;
Agent::notifyCb notifyHandler;
Notifiable* notifiable;
Uuid systemId;
uint16_t bootSequence;
uint32_t nextContextNum;
bool running;
deque<AgentEventImpl::Ptr> eventQueue;
deque<AgentEventImpl::Ptr> internalEventQueue;
map<uint32_t, AsyncContext::Ptr> contextMap;
Connection connection;
Session session;
Receiver directReceiver;
Receiver topicReceiver;
Sender sender;
qpid::sys::Thread* thread;
StoreThread* storeThread;
struct AgentClassKey {
string name;
uint8_t hash[16];
AgentClassKey(const string& n, const uint8_t* h) : name(n) {
memcpy(hash, h, 16);
string repr() {
return name;
struct AgentClassKeyComp {
bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
if ( !=
return <;
for (int i = 0; i < 16; i++)
if (lhs.hash[i] != rhs.hash[i])
return lhs.hash[i] < rhs.hash[i];
return false;
typedef map<AgentClassKey, SchemaClass*, AgentClassKeyComp> ClassMap;
map<string, ClassMap> packages;
AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
const string& key);
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaClass* cls);
void notify();
void handleRcvMessageLH(const Message& message);
void handleAgentLocateLH(const Message& message);
void handleQueryRequestLH(const Message& message);
void handleSubscribeRequest(const Message& message);
void handleSubscribeCancel(const Message& message);
void handleSubscribeRefresh(const Message& message);
void handleMethodRequest(const Message& message);
void sendResponse(const Message& request, const string& opcode, const Data& data);
void send(const Address& address, const string& correlationId, const string& opcode,
const string& cType, const Data& data);
void send(const Address& address, const string& correlationId, const string& opcode,
const string& cType, const Variant::List& list, bool partial=false);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
void handlePackageRequest(Message& msg);
void handleClassQuery(Message& msg);
void handleSchemaRequest(Message& msg, uint32_t sequence,
const string& replyToExchange, const string& replyToKey);
void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
AgentEvent AgentEventImpl::copy()
AgentEvent item;
::memset(&item, 0, sizeof(AgentEvent));
item.kind = kind;
item.sequence = sequence;
item.object = object;
item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
return item;
void StoreThread::addObject(const Data& data)
DataPtr stored(new Data(data));
void StoreThread::run()
while (running) {
AgentEventImpl::Ptr ptr(agent.nextInternalEvent());
void StoreThread::stop()
running = false;
void StoreThread::modifyStart(DataPtr)
// Algorithm:
// Make a copy of the indicated object as a delta base if there
// isn't already one in place. If there is, do nothing.
void StoreThread::modifyDone(DataPtr)
// Algorithm:
// If any deltas between the current and the stored base are discrete,
// send an immediate update. Otherwise, mark the object as modified.
// If an update is sent, delete the base copy. If not, leave the base copy
// in place for the later periodic update.
void StoreThread::destroy(DataPtr)
// Algorithm:
// Send an immediate full-update for this object with the delete time set.
// Remove the object and any copies from the data store.
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
topicAddr = "qmf." + domain + ".topic/console.ind.#";
eventSendAddr = "qmf." + domain + ".topic/agent.event";
if (_d != 0) {
directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
attrMap["_vendor"] = vendor;
attrMap["_product"] = product;
attrMap["_instance"] = name;
attrMap["_name"] = vendor + ":" + product + ":" + name;
if (internalStore) {
storeThread = new StoreThread(*this);
void AgentImpl::setNotifyCallback(Agent::notifyCb handler)
Mutex::ScopedLock _lock(lock);
notifyHandler = handler;
void AgentImpl::setNotifyCallback(Notifiable* handler)
Mutex::ScopedLock _lock(lock);
notifiable = handler;
void AgentImpl::setAttr(const char* key, const Variant& value)
Mutex::ScopedLock _lock(lock);
attrMap[key] = value;
void AgentImpl::setStoreDir(const char* path)
Mutex::ScopedLock _lock(lock);
if (path)
storeDir = path;
void AgentImpl::setTransferDir(const char* path)
Mutex::ScopedLock _lock(lock);
if (path)
transferDir = path;
bool AgentImpl::getEvent(AgentEvent& event) const
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
return false;
event = eventQueue.front()->copy();
return true;
void AgentImpl::popEvent()
Mutex::ScopedLock _lock(lock);
if (!eventQueue.empty())
void AgentImpl::setConnection(Connection& conn)
Mutex::ScopedLock _lock(lock);
// Don't permit the overwriting of an existing connection
// TODO: return an error or throw an exception if an overwrite is attempted.
if (connection == 0)
connection = conn;
// Start the Agent thread now that we have a connection to work with.
thread = new qpid::sys::Thread(*this);
void AgentImpl::authAllow(uint32_t sequence)
Mutex::ScopedLock _lock(lock);
// Find the context associated with the sequence number
map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
AsyncContext::Ptr context = iter->second;
// Transform the authorize event into the real event
switch (context->authorizedEvent->kind) {
case AgentEvent::GET_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break;
case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break;
case AgentEvent::SYNC_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::START_SYNC; break;
// Re-issue the now-authorized action. If this is a data query (get or subscribe),
// and the agent is handling storage internally, redirect to the internal event
// queue for processing by the internal-storage thread.
if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) {
} else {
void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
Mutex::ScopedLock _lock(lock);
map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
AsyncContext::Ptr context = iter->second;
// Return an exception message to the requestor
send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception);
void AgentImpl::authDeny(uint32_t sequence, const string& error)
Data exception;
exception.getValues()["status"] = "Access to this Operation Denied";
exception.getValues()["text"] = error;
authDeny(sequence, exception);
void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
Mutex::ScopedLock _lock(lock);
map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
AsyncContext::Ptr context = iter->second;
// TODO: Encode method response
QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
void AgentImpl::queryResponse(uint32_t sequence, Data& data)
AsyncContext::Ptr context;
Mutex::ScopedLock _lock(lock);
map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
context = iter->second;
Variant::List list;
send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true);
QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo);
void AgentImpl::queryComplete(uint32_t sequence)
AsyncContext::Ptr context;
Mutex::ScopedLock _lock(lock);
map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter == contextMap.end())
context = iter->second;
send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List());
QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message");
void AgentImpl::registerClass(SchemaClass* cls)
Mutex::ScopedLock _lock(lock);
map<string, ClassMap>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
if (iter == packages.end()) {
packages[cls->getClassKey()->getPackageName()] = ClassMap();
iter = packages.find(cls->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHashData());
iter->second[key] = cls;
// TODO: Indicate this schema if connected.
const char* AgentImpl::addObject(Data&, const char*)
// TODO: Implement
// Determine a key for this object:
// if supplied, use the supplied key
// else:
// if the data is described (has a schema), use the schema primary-key to generate a key
// else make something up (a guid)
Mutex::ScopedLock _lock(lock);
return 0;
void AgentImpl::raiseEvent(Data& data)
Variant::List list;
send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list);
void AgentImpl::run()
qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
session = connection.newSession();
QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams);
directReceiver = session.createReceiver(directAddr + directAddrParams);
QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams);
topicReceiver = session.createReceiver(topicAddr + topicAddrParams);
Mutex::ScopedLock _lock(lock);
while (running) {
Receiver rcvr;
bool available;
Mutex::ScopedUnlock _unlock(lock);
available = session.nextReceiver(rcvr, duration);
if (available) {
Message msg(rcvr.get());
void AgentImpl::stop()
Mutex::ScopedLock _lock(lock);
running = false;
AgentEventImpl::Ptr AgentImpl::nextInternalEvent()
Mutex::ScopedLock _lock(lock);
while (internalEventQueue.empty())
AgentEventImpl::Ptr event(internalEventQueue.front());
return event;
// TODO: make sure this function returns with a null pointer when the thread needs to stop.
void AgentImpl::handleRcvMessageLH(const Message& message)
Variant::Map headers(message.getHeaders());
cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() <<
" replyTo=" << message.getReplyTo() <<
" headers=" << headers << endl;
if (message.getContentType() != Protocol::AMQP_CONTENT_MAP &&
message.getContentType() != Protocol::AMQP_CONTENT_LIST)
Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
if (iter == headers.end())
string opcode = iter->second.asString();
if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message);
if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message);
if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message);
if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message);
if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message);
if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message);
void AgentImpl::handleAgentLocateLH(const Message& message)
QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo());
auto_ptr<Query> query(QueryImpl::factory(ListView(message)));
if (query->matches(attrMap)) {
Data data(0, attrMap);
sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data);
QPID_LOG(trace, "SENT AgentLocateResponse");
void AgentImpl::handleQueryRequestLH(const Message& message)
uint32_t contextNum = nextContextNum++;
AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo()));
contextMap[contextNum] = context;
// Build the event for the get request
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE));
event->sequence = contextNum;
event->authUserId = message.getUserId();
// Put the not-yet-authorized event into the context for possible later use
context->authorizedEvent = event;
// Enqueue the event
void AgentImpl::handleSubscribeRequest(const Message& message)
const MapView map(message);
void AgentImpl::handleSubscribeCancel(const Message& message)
const MapView map(message);
void AgentImpl::handleSubscribeRefresh(const Message& message)
const MapView map(message);
void AgentImpl::handleMethodRequest(const Message& message)
const MapView map(message);
void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data);
void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data)
Message message;
MapContent content(message, data.impl->asMap());
if (!correlationId.empty())
if (!cType.empty())
message.getHeaders()[Protocol::APP_CONTENT] = cType;
message.getHeaders()[Protocol::APP_OPCODE] = opcode;
void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial)
Message message;
ListContent content(message, list);
if (!correlationId.empty())
if (!cType.empty())
message.getHeaders()[Protocol::APP_CONTENT] = cType;
message.getHeaders()[Protocol::APP_OPCODE] = opcode;
if (partial)
message.getHeaders()[Protocol::APP_PARTIAL] = Variant();
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
event->objectKey = key;
return event;
AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaClass* cls)
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
event->sequence = num;
event->authUserId = userId;
event->name = method;
event->objectKey = key;
event->arguments = argMap;
event->objectClass = cls;
return event;
void AgentImpl::notify()
if (notifyHandler != 0)
if (notifiable != 0)
void AgentImpl::sendPackageIndicationLH(const string& packageName)
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key)
QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" <<;
void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text)
QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text)
QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text);
void AgentImpl::handlePackageRequest(Message&)
Mutex::ScopedLock _lock(lock);
void AgentImpl::handleClassQuery(Message&)
Mutex::ScopedLock _lock(lock);
void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&)
Mutex::ScopedLock _lock(lock);
void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&)
Mutex::ScopedLock _lock(lock);
void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/)
// Wrappers
Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); }
void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); }
void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
void Agent::popEvent() { impl->popEvent(); }
void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); }
void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); }
void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); }
void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); }
void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); }
void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); }