blob: 46052854486a3cac1257f762387b7702713bf9d7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qmf/AgentSessionImpl.h"
#include <iostream>
#include <memory>
namespace qmf {
using std::string;
using std::map;
using qpid::messaging::Address;
using qpid::messaging::Connection;
using qpid::messaging::Duration;
using qpid::messaging::Message;
using qpid::messaging::Receiver;
using qpid::messaging::Sender;
using qpid::types::Variant;
AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
AgentSession::~AgentSession() { PI::dtor(*this); }
AgentSession& AgentSession::operator=(const AgentSession& s) { return PI::assign(*this, s); }
AgentSession::AgentSession(Connection& c, const string& o) { PI::ctor(*this, new AgentSessionImpl(c, o)); }
void AgentSession::setDomain(const string& d) { impl->setDomain(d); }
void AgentSession::setVendor(const string& v) { impl->setVendor(v); }
void AgentSession::setProduct(const string& p) { impl->setProduct(p); }
void AgentSession::setInstance(const string& i) { impl->setInstance(i); }
void AgentSession::setAttribute(const string& k, const qpid::types::Variant& v) { impl->setAttribute(k, v); }
const string& AgentSession::getName() const { return impl->getName(); }
void AgentSession::open() { impl->open(); }
void AgentSession::close() { impl->close(); }
bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); }
int AgentSession::pendingEvents() const { return impl->pendingEvents(); }
void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); }
DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); }
void AgentSession::delData(const DataAddr& a) { impl->delData(a); }
void AgentSession::authAccept(AgentEvent& e) { impl->authAccept(e); }
void AgentSession::authReject(AgentEvent& e, const string& m) { impl->authReject(e, m); }
void AgentSession::raiseException(AgentEvent& e, const string& s) { impl->raiseException(e, s); }
void AgentSession::raiseException(AgentEvent& e, const Data& d) { impl->raiseException(e, d); }
void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d); }
void AgentSession::complete(AgentEvent& e) { impl->complete(e); }
void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); }
void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); }
void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
//========================================================================================
// Impl Method Bodies
//========================================================================================
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
schemaUpdateTime(uint64_t(qpid::sys::Duration::FromEpoch()))
{
//
// Set Agent Capability Level
//
attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8;
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
Variant::Map optMap;
Variant::Map::const_iterator iter;
parser.parseMap(optMap);
iter = optMap.find("domain");
if (iter != optMap.end())
domain = iter->second.asString();
iter = optMap.find("interval");
if (iter != optMap.end()) {
interval = iter->second.asUint32();
if (interval < 1)
interval = 1;
}
iter = optMap.find("external");
if (iter != optMap.end())
externalStorage = iter->second.asBool();
iter = optMap.find("allow-queries");
if (iter != optMap.end())
autoAllowQueries = iter->second.asBool();
iter = optMap.find("allow-methods");
if (iter != optMap.end())
autoAllowMethods = iter->second.asBool();
iter = optMap.find("max-subscriptions");
if (iter != optMap.end())
maxSubscriptions = iter->second.asUint32();
iter = optMap.find("min-sub-interval");
if (iter != optMap.end())
minSubInterval = iter->second.asUint32();
iter = optMap.find("sub-lifetime");
if (iter != optMap.end())
subLifetime = iter->second.asUint32();
iter = optMap.find("public-events");
if (iter != optMap.end())
publicEvents = iter->second.asBool();
iter = optMap.find("listen-on-direct");
if (iter != optMap.end())
listenOnDirect = iter->second.asBool();
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
iter = optMap.find("max-thread-wait-time");
if (iter != optMap.end())
maxThreadWaitTime = iter->second.asUint32();
}
if (maxThreadWaitTime > interval)
maxThreadWaitTime = interval;
}
AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
if (thread) {
thread->join();
delete thread;
}
}
void AgentSessionImpl::open()
{
if (opened)
throw QmfException("The session is already open");
// If the thread exists, join and delete it before creating a new one.
if (thread) {
thread->join();
delete thread;
}
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
// Establish messaging addresses
setAgentName();
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
// Create AMQP session, receivers, and senders
session = connection.createSession();
Receiver directRx;
Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
if (listenOnDirect && !strictSecurity) {
directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
directRx.setCapacity(64);
}
routableDirectRx.setCapacity(64);
topicRx.setCapacity(64);
if (!strictSecurity)
directSender = session.createSender(directBase + addrArgs);
topicSender = session.createSender(topicBase + addrArgs);
// Start the receiver thread
threadCanceled = false;
opened = true;
thread = new qpid::sys::Thread(*this);
// Send an initial agent heartbeat message
sendHeartbeat();
}
void AgentSessionImpl::closeAsync()
{
if (!opened)
return;
// Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
opened = false;
}
void AgentSessionImpl::close()
{
closeAsync();
if (thread) {
thread->join();
delete thread;
thread = 0;
}
}
bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
{
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
if (eventQueue.empty() && milliseconds > 0) {
int64_t nsecs(qpid::sys::TIME_INFINITE);
if ((uint64_t)(nsecs / 1000000) > milliseconds)
nsecs = (int64_t) milliseconds * 1000000;
qpid::sys::Duration then(nsecs);
cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
}
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
if (eventQueue.empty())
alertEventNotifierLH(false);
return true;
}
return false;
}
int AgentSessionImpl::pendingEvents() const
{
qpid::sys::Mutex::ScopedLock l(lock);
return eventQueue.size();
}
void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
{
qpid::sys::Mutex::ScopedLock l(lock);
eventNotifier = notifier;
}
EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
{
qpid::sys::Mutex::ScopedLock l(lock);
return eventNotifier;
}
void AgentSessionImpl::registerSchema(Schema& schema)
{
if (!schema.isFinalized())
schema.finalize();
const SchemaId& schemaId(schema.getSchemaId());
qpid::sys::Mutex::ScopedLock l(lock);
schemata[schemaId] = schema;
schemaIndex[schemaId] = DataIndex();
//
// Get the news out at the next periodic interval that there is new schema information.
//
schemaUpdateTime = uint64_t(qpid::sys::Duration::FromEpoch());
forceHeartbeat = true;
}
DataAddr AgentSessionImpl::addData(Data& data, const string& name, bool persistent)
{
if (externalStorage)
throw QmfException("addData() must not be called when the 'external' option is enabled.");
string dataName;
if (name.empty())
dataName = qpid::types::Uuid(true).str();
else
dataName = name;
DataAddr addr(dataName, agentName, persistent ? 0 : bootSequence);
data.setAddr(addr);
{
qpid::sys::Mutex::ScopedLock l(lock);
DataIndex::const_iterator iter = globalIndex.find(addr);
if (iter != globalIndex.end())
throw QmfException("Duplicate Data Address");
globalIndex[addr] = data;
if (data.hasSchema())
schemaIndex[data.getSchemaId()][addr] = data;
}
//
// TODO: Survey active subscriptions to see if they need to hear about this new data.
//
return addr;
}
void AgentSessionImpl::delData(const DataAddr& addr)
{
{
qpid::sys::Mutex::ScopedLock l(lock);
DataIndex::iterator iter = globalIndex.find(addr);
if (iter == globalIndex.end())
return;
if (iter->second.hasSchema()) {
const SchemaId& schemaId(iter->second.getSchemaId());
schemaIndex[schemaId].erase(addr);
}
globalIndex.erase(iter);
}
//
// TODO: Survey active subscriptions to see if they need to hear about this deleted data.
//
}
void AgentSessionImpl::authAccept(AgentEvent& authEvent)
{
std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_QUERY));
eventImpl->setQuery(authEvent.getQuery());
eventImpl->setUserId(authEvent.getUserId());
eventImpl->setReplyTo(AgentEventImplAccess::get(authEvent).getReplyTo());
eventImpl->setCorrelationId(AgentEventImplAccess::get(authEvent).getCorrelationId());
AgentEvent event(eventImpl.release());
if (externalStorage) {
enqueueEvent(event);
return;
}
const Query& query(authEvent.getQuery());
if (query.getDataAddr().isValid()) {
{
qpid::sys::Mutex::ScopedLock l(lock);
DataIndex::const_iterator iter = globalIndex.find(query.getDataAddr());
if (iter != globalIndex.end())
response(event, iter->second);
}
complete(event);
return;
}
if (query.getSchemaId().isValid()) {
{
qpid::sys::Mutex::ScopedLock l(lock);
map<SchemaId, DataIndex, SchemaIdCompareNoHash>::const_iterator iter = schemaIndex.find(query.getSchemaId());
if (iter != schemaIndex.end())
for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++)
if (query.matchesPredicate(dIter->second.getProperties()))
response(event, dIter->second);
}
complete(event);
return;
}
raiseException(event, "Query is Invalid");
}
void AgentSessionImpl::authReject(AgentEvent& event, const string& error)
{
raiseException(event, "Action Forbidden - " + error);
}
void AgentSessionImpl::raiseException(AgentEvent& event, const string& error)
{
Data exception(new DataImpl());
exception.setProperty("error_text", error);
raiseException(event, exception);
}
void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION;
headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
const DataImpl& dataImpl(DataImplAccess::get(data));
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(dataImpl.asMap(), msg);
send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo());
}
void AgentSessionImpl::response(AgentEvent& event, const Data& data)
{
AgentEventImpl& impl(AgentEventImplAccess::get(event));
uint32_t count = impl.enqueueData(data);
if (count >= 8)
flushResponses(event, false);
}
void AgentSessionImpl::complete(AgentEvent& event)
{
flushResponses(event, true);
}
void AgentSessionImpl::methodSuccess(AgentEvent& event)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
const Variant::Map& outArgs(eventImpl.getReturnArguments());
const Variant::Map& outSubtypes(eventImpl.getReturnArgumentSubtypes());
map["_arguments"] = outArgs;
if (!outSubtypes.empty())
map["_subtypes"] = outSubtypes;
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(map, msg);
send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo());
}
void AgentSessionImpl::raiseEvent(const Data& data)
{
int severity(SEV_NOTICE);
if (data.hasSchema()) {
const Schema& schema(DataImplAccess::get(data).getSchema());
if (schema.isValid())
severity = schema.getDefaultSeverity();
}
raiseEvent(data, severity);
}
void AgentSessionImpl::raiseEvent(const Data& data, int severity)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
string subject("agent.ind.event");
if (data.hasSchema()) {
const SchemaId& schemaId(data.getSchemaId());
if (schemaId.getType() != SCHEMA_TYPE_EVENT)
throw QmfException("Cannot call raiseEvent on data that is not an Event");
subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName();
}
if (severity < SEV_EMERG || severity > SEV_DEBUG)
throw QmfException("Invalid severity value");
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION;
headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject(subject);
Variant::List list;
Variant::Map dataAsMap(DataImplAccess::get(data).asMap());
dataAsMap["_severity"] = severity;
dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration::FromEpoch());
list.push_back(dataAsMap);
encode(list, msg);
topicSender.send(msg);
QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject);
}
void AgentSessionImpl::checkOpen()
{
if (opened)
throw QmfException("Operation must be performed before calling open()");
}
void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
{
qpid::sys::Mutex::ScopedLock l(lock);
bool notify = eventQueue.empty();
eventQueue.push(event);
if (notify) {
cond.notify();
alertEventNotifierLH(true);
}
}
void AgentSessionImpl::setAgentName()
{
Variant::Map::iterator iter;
string vendor;
string product;
string instance;
iter = attributes.find("_vendor");
if (iter == attributes.end())
attributes["_vendor"] = vendor;
else
vendor = iter->second.asString();
iter = attributes.find("_product");
if (iter == attributes.end())
attributes["_product"] = product;
else
product = iter->second.asString();
iter = attributes.find("_instance");
if (iter == attributes.end()) {
instance = qpid::types::Uuid(true).str();
attributes["_instance"] = instance;
} else
instance = iter->second.asString();
agentName = vendor + ":" + product + ":" + instance;
attributes["_name"] = agentName;
}
void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
{
QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo());
if (!predicate.empty()) {
Query agentQuery(QUERY_OBJECT);
agentQuery.setPredicate(predicate);
if (!agentQuery.matchesPredicate(attributes)) {
QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring");
return;
}
}
Message reply;
Variant::Map map;
Variant::Map& headers(reply.getProperties());
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
map["_values"] = attributes;
map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch());
map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, reply);
send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo());
}
void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg)
{
QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application.
//
std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_METHOD));
eventImpl->setUserId(msg.getUserId());
eventImpl->setReplyTo(msg.getReplyTo());
eventImpl->setCorrelationId(msg.getCorrelationId());
Variant::Map::const_iterator iter;
iter = content.find("_method_name");
if (iter == content.end()) {
AgentEvent event(eventImpl.release());
raiseException(event, "Malformed MethodRequest: missing _method_name field");
return;
}
eventImpl->setMethodName(iter->second.asString());
iter = content.find("_arguments");
if (iter != content.end())
eventImpl->setArguments(iter->second.asMap());
iter = content.find("_subtypes");
if (iter != content.end())
eventImpl->setArgumentSubtypes(iter->second.asMap());
iter = content.find("_object_id");
if (iter != content.end()) {
DataAddr addr(new DataAddrImpl(iter->second.asMap()));
eventImpl->setDataAddr(addr);
if (!externalStorage) {
DataIndex::const_iterator iter(globalIndex.find(addr));
if (iter == globalIndex.end()) {
AgentEvent event(eventImpl.release());
raiseException(event, "No data object found with the specified address");
return;
}
const Schema& schema(DataImplAccess::get(iter->second).getSchema());
if (schema.isValid()) {
eventImpl->setSchema(schema);
for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin();
aIter != eventImpl->getArguments().end(); aIter++) {
const Schema& schema(DataImplAccess::get(iter->second).getSchema());
if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) {
AgentEvent event(eventImpl.release());
raiseException(event, "Invalid argument: " + aIter->first);
return;
}
}
}
}
}
enqueueEvent(AgentEvent(eventImpl.release()));
}
void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg)
{
QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application or directly handled by the agent.
//
std::auto_ptr<QueryImpl> queryImpl(new QueryImpl(content));
std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY));
eventImpl->setUserId(msg.getUserId());
eventImpl->setReplyTo(msg.getReplyTo());
eventImpl->setCorrelationId(msg.getCorrelationId());
eventImpl->setQuery(queryImpl.release());
AgentEvent ae(eventImpl.release());
if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) {
handleSchemaRequest(ae);
return;
}
if (autoAllowQueries)
authAccept(ae);
else
enqueueEvent(ae);
}
void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
{
SchemaMap::const_iterator iter;
string error;
const Query& query(event.getQuery());
Message msg;
Variant::List content;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
{
qpid::sys::Mutex::ScopedLock l(lock);
if (query.getTarget() == QUERY_SCHEMA_ID) {
headers[protocol::HEADER_KEY_CONTENT] = "_schema_id";
for (iter = schemata.begin(); iter != schemata.end(); iter++)
content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
} else if (query.getSchemaId().isValid()) {
headers[protocol::HEADER_KEY_CONTENT] = "_schema";
iter = schemata.find(query.getSchemaId());
if (iter != schemata.end())
content.push_back(SchemaImplAccess::get(iter->second).asMap());
} else {
error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID.";
}
}
if (!error.empty()) {
raiseException(event, error);
return;
}
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(content, msg);
send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
}
void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, uint32_t seq, const Message& msg)
{
string packageName;
string className;
uint8_t hashBits[16];
buffer.getShortString(packageName);
buffer.getShortString(className);
buffer.getBin128(hashBits);
QPID_LOG(trace, "RCVD QMFv1 SchemaRequest for " << packageName << ":" << className);
qpid::types::Uuid hash(hashBits);
map<SchemaId, Schema, SchemaIdCompare>::const_iterator iter;
string replyContent;
SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className);
dataId.setHash(hash);
{
qpid::sys::Mutex::ScopedLock l(lock);
iter = schemata.find(dataId);
if (iter != schemata.end())
replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
else {
SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
eventId.setHash(hash);
iter = schemata.find(dataId);
if (iter != schemata.end())
replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
else
return;
}
}
Message reply;
Variant::Map& headers(reply.getProperties());
headers[protocol::HEADER_KEY_AGENT] = agentName;
reply.setContent(replyContent);
send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo());
}
void AgentSessionImpl::dispatch(Message msg)
{
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
//
// If strict-security is enabled, make sure that reply-to address complies with the
// strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
//
if (strictSecurity && msg.getReplyTo()) {
if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
return;
}
}
iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
// Dispatch a QMFv2 formatted message
//
iter = properties.find(protocol::HEADER_KEY_OPCODE);
if (iter == properties.end()) {
QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
return;
}
const string& opcode = iter->second.asString();
if (msg.getContentType() == "amqp/list") {
Variant::List content;
decode(msg, content);
if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
}
} else if (msg.getContentType() == "amqp/map") {
Variant::Map content;
decode(msg, content);
if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg);
else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
}
} else {
QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map");
}
} else {
//
// Dispatch a QMFv1 formatted message
//
const string& body(msg.getContent());
if (body.size() < 8)
return;
qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size());
if (buffer.getOctet() != 'A') return;
if (buffer.getOctet() != 'M') return;
if (buffer.getOctet() != '2') return;
char v1Opcode(buffer.getOctet());
uint32_t seq(buffer.getLong());
if (v1Opcode == 'S') handleV1SchemaRequest(buffer, seq, msg);
else {
QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode);
}
}
}
void AgentSessionImpl::sendHeartbeat()
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
std::stringstream address;
address << "agent.ind.heartbeat";
// append .<vendor>.<product> to address key if present.
Variant::Map::const_iterator v;
if ((v = attributes.find("_vendor")) != attributes.end() && !v->second.getString().empty()) {
address << "." << v->second.getString();
if ((v = attributes.find("_product")) != attributes.end() && !v->second.getString().empty()) {
address << "." << v->second.getString();
}
}
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject(address.str());
map["_values"] = attributes;
map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch());
map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, msg);
topicSender.send(msg);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName);
}
void AgentSessionImpl::send(Message msg, const Address& to)
{
Sender sender;
if (strictSecurity && to.getName() != topicBase) {
QPID_LOG(warning, "Address violates strict-security policy: " << to);
return;
}
if (to.getName() == directBase) {
msg.setSubject(to.getSubject());
sender = directSender;
} else if (to.getName() == topicBase) {
msg.setSubject(to.getSubject());
sender = topicSender;
} else
sender = session.createSender(to);
sender.send(msg);
}
void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
if (!final)
headers[protocol::HEADER_KEY_PARTIAL] = Variant();
Variant::List body;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
Data data(eventImpl.dequeueData());
while (data.isValid()) {
DataImpl& dataImpl(DataImplAccess::get(data));
body.push_back(dataImpl.asMap());
data = eventImpl.dequeueData();
}
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(body, msg);
send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo());
}
void AgentSessionImpl::periodicProcessing(uint64_t seconds)
{
//
// The granularity of this timer is seconds. Don't waste time looking for work if
// it's been less than a second since we last visited.
//
if (seconds == lastVisit)
return;
//uint64_t thisInterval(seconds - lastVisit);
lastVisit = seconds;
//
// First time through, set lastHeartbeat to the current time.
//
if (lastHeartbeat == 0)
lastHeartbeat = seconds;
//
// If the hearbeat interval has elapsed, send a heartbeat.
//
if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) {
lastHeartbeat = seconds;
forceHeartbeat = false;
sendHeartbeat();
}
//
// TODO: process any active subscriptions on their intervals.
//
}
void AgentSessionImpl::alertEventNotifierLH(bool readable)
{
if (eventNotifier)
eventNotifier->setReadable(readable);
}
void AgentSessionImpl::run()
{
QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
try {
while (!threadCanceled) {
periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_SEC);
Receiver rx;
bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
try {
dispatch(rx.fetch());
} catch (qpid::types::Exception& e) {
QPID_LOG(error, "Exception caught in message dispatch: " << e.what());
}
session.acknowledge();
}
}
} catch (qpid::types::Exception& e) {
QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what());
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
{
return *session.impl;
}
const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
{
return *session.impl;
}
}