blob: 81bf54276670ff651e8e126cb06ebc63be1a05b8 [file] [log] [blame]
#ifndef _ManagementAgent_
#define _ManagementAgent_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qmf/org/apache/qpid/broker/Memory.h"
#include "qpid/sys/MemStat.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/ResizableBuffer.h>
#include <boost/shared_ptr.hpp>
#include <memory>
#include <string>
#include <map>
namespace qpid {
namespace broker {
class Connection;
class ProtocolRegistry;
}
namespace sys {
class Timer;
}
namespace management {
class ManagementAgent
{
private:
int threadPoolSize;
public:
typedef enum {
SEV_EMERG = 0,
SEV_ALERT = 1,
SEV_CRIT = 2,
SEV_ERROR = 3,
SEV_WARN = 4,
SEV_NOTE = 5,
SEV_INFO = 6,
SEV_DEBUG = 7,
SEV_DEFAULT = 8
} severity_t;
ManagementAgent (const bool qmfV1, const bool qmfV2);
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
void setName(const std::string& vendor,
const std::string& product,
const std::string& instance="");
void getName(std::string& vendor, std::string& product, std::string& instance);
const std::string& getAddress();
void setInterval(uint16_t _interval) { interval = _interval; }
void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
uint64_t persistId = 0,
bool persistent = false);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
const std::string& key,
bool persistent = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args,
const bool topic,
int qmfVersion);
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
const framing::Uuid& getUuid() const { return uuid; }
void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
static types::Variant::Map toMap(const framing::FieldTable& from);
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
~DeletedObject() {};
const std::string getKey() const {
// used to batch up objects of the same class type
return std::string(packageName + std::string(":") + className);
}
private:
friend class ManagementAgent;
std::string packageName;
std::string className;
std::string objectId;
std::string encodedV1Config; // qmfv1 properties
std::string encodedV1Inst; // qmfv1 statistics
qpid::types::Variant::Map encodedV2;
};
typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
private:
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
struct RemoteAgent : public Manageable
{
ManagementAgent& agent;
uint32_t brokerBank;
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap;
// Storage for known schema classes:
//
// SchemaClassKey -- Key elements for map lookups
// SchemaClassKeyComp -- Comparison class for SchemaClassKey
// SchemaClass -- Non-key elements for classes
//
struct SchemaClassKey
{
std::string name;
uint8_t hash[16];
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
uint32_t encodedBufSize() const;
};
struct SchemaClassKeyComp
{
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
else
for (int i = 0; i < 16; i++)
if (lhs.hash[i] != rhs.hash[i])
return lhs.hash[i] < rhs.hash[i];
return false;
}
};
struct SchemaClass
{
uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
std::string data;
uint32_t pendingSequence;
SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
void appendSchema (framing::Buffer& buf);
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
PackageMap packages;
//
// Protected by objectLock
//
ManagementObjectMap managementObjects;
//
// Protected by addLock
//
ManagementObjectVector newManagementObjects;
framing::Uuid uuid;
//
// Lock ordering: userLock -> addLock -> objectLock
//
sys::Mutex userLock;
sys::Mutex addLock;
sys::Mutex objectLock;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
qpid::broker::Exchange::shared_ptr v2Topic;
qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
bool publish;
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
qpid::broker::ProtocolRegistry* protocols;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
const qpid::sys::AbsTime startTime;
bool suppressed;
typedef std::pair<std::string,std::string> MethodName;
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
bool disallowAllV1Methods;
// Agent name and address
qpid::types::Variant::Map attrMap;
std::string name_address;
std::string vendorNameKey; // "." --> "_"
std::string productNameKey; // "." --> "_"
std::string instanceNameKey; // "." --> "_"
// supported management protocol
bool qmf1Support;
bool qmf2Support;
// Maximum # of objects allowed in a single V2 response
// message.
uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
// Indexed by a string composed of the object's package and class name.
// Protected by objectLock.
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
// Pollable queue to serialize event messages
typedef std::pair<boost::shared_ptr<broker::Exchange>,
broker::Message> ExchangeAndMessage;
typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
//
// Memory statistics object
//
qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
void writeData ();
void periodicProcessing (void);
void deleteObjectNow(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
void sendBuffer(framing::Buffer& buf,
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey);
void sendBuffer(framing::Buffer& buf,
const std::string& exchange,
const std::string& routingKey);
void sendBuffer(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
const std::string& content_type,
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
void sendBuffer(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
const std::string& content_type,
const std::string& exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
void moveNewObjects();
bool moveDeletedObjects();
bool authorizeAgentMessage(qpid::broker::Message& msg);
void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
PackageMap::iterator pIter,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void encodeClassIndication (framing::Buffer& buf,
const std::string packageName,
const struct SchemaClassKey key,
uint8_t kind);
bool bankInUse (uint32_t bank);
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
uint32_t code = 0, const std::string& text = "OK");
void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId);
void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
ManagementObjectMap::iterator numericFind(const ObjectId& oid);
std::string summarizeAgents();
void debugSnapshot(const char* title);
std::auto_ptr<EventQueue> sendQueue;
};
void setManagementExecutionContext(const broker::Connection&);
void resetManagementExecutionContext();
const broker::Connection* getCurrentPublisher();
}}
#endif /*!_ManagementAgent_*/