blob: fb15dc6ed12d538be5ac41e16f2d5fe3cb29a033 [file] [log] [blame]
#ifndef _ManagementAgent_
#define _ManagementAgent_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
#include <map>
namespace qpid {
namespace broker {
class ConnectionState;
}
namespace management {
class ManagementAgent
{
private:
int threadPoolSize;
public:
typedef enum {
SEV_EMERG = 0,
SEV_ALERT = 1,
SEV_CRIT = 2,
SEV_ERROR = 3,
SEV_WARN = 4,
SEV_NOTE = 5,
SEV_INFO = 6,
SEV_DEBUG = 7,
SEV_DEFAULT = 8
} severity_t;
ManagementAgent (const bool qmfV1, const bool qmfV2);
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
void configure (const std::string& dataDir, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
/** Called after plugins are initialized. */
void pluginsInitialized();
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
void setName(const std::string& vendor,
const std::string& product,
const std::string& instance="");
void getName(std::string& vendor, std::string& product, std::string& instance);
const std::string& getAddress();
void setInterval(uint16_t _interval) { interval = _interval; }
void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0,
bool persistent = false);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
const std::string& key,
bool persistent = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
QPID_BROKER_EXTERN void clusterUpdate();
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args,
const bool topic,
int qmfVersion);
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
/** Disallow all QMFv1 methods (used in clustered brokers). */
void disallowV1Methods() { disallowAllV1Methods = true; }
/** Serialize my schemas as a binary blob into schemaOut */
void exportSchemas(std::string& schemaOut);
/** Serialize my remote-agent map as a binary blob into agentsOut */
void exportAgents(std::string& agentsOut);
/** Decode a serialized schemas and add to my schema cache */
void importSchemas(framing::Buffer& inBuf);
/** Decode a serialized agent map */
void importAgents(framing::Buffer& inBuf);
// these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
uint64_t getNextObjectId(void) { return nextObjectId; }
void setNextObjectId(uint64_t o) { nextObjectId = o; }
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
const framing::Uuid& getUuid() const { return uuid; }
void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
// TODO: remove these when Variant API moved into common library.
static types::Variant::Map toMap(const framing::FieldTable& from);
static framing::FieldTable fromMap(const types::Variant::Map& from);
static types::Variant::List toList(const framing::List& from);
static framing::List fromList(const types::Variant::List& from);
static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
// For Clustering: management objects that have been marked as
// "deleted", but are waiting for their last published object
// update are not visible to the cluster replication code. These
// interfaces allow clustering to gather up all the management
// objects that are deleted in order to allow all clustered
// brokers to publish the same set of deleted objects.
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
DeletedObject(ManagementObject *, bool v1, bool v2);
DeletedObject( const std::string &encoded );
~DeletedObject() {};
void encode( std::string& toBuffer );
const std::string getKey() const {
// used to batch up objects of the same class type
return std::string(packageName + std::string(":") + className);
}
private:
friend class ManagementAgent;
std::string packageName;
std::string className;
std::string objectId;
std::string encodedV1Config; // qmfv1 properties
std::string encodedV1Inst; // qmfv1 statistics
qpid::types::Variant::Map encodedV2;
};
typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
/** returns a snapshot of all currently deleted management objects. */
void exportDeletedObjects( DeletedObjectList& outList );
/** Import a list of deleted objects to send on next publish interval. */
void importDeletedObjects( const DeletedObjectList& inList );
private:
struct Periodic : public qpid::sys::TimerTask
{
ManagementAgent& agent;
Periodic (ManagementAgent& agent, uint32_t seconds);
virtual ~Periodic ();
void fire ();
};
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
struct RemoteAgent : public Manageable
{
ManagementAgent& agent;
uint32_t brokerBank;
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent* mgmtObject;
RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap;
// Storage for known schema classes:
//
// SchemaClassKey -- Key elements for map lookups
// SchemaClassKeyComp -- Comparison class for SchemaClassKey
// SchemaClass -- Non-key elements for classes
//
struct SchemaClassKey
{
std::string name;
uint8_t hash[16];
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
uint32_t encodedBufSize() const;
};
struct SchemaClassKeyComp
{
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
else
for (int i = 0; i < 16; i++)
if (lhs.hash[i] != rhs.hash[i])
return lhs.hash[i] < rhs.hash[i];
return false;
}
};
struct SchemaClass
{
uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
std::string data;
uint32_t pendingSequence;
SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
void appendSchema (framing::Buffer& buf);
void mapEncode(qpid::types::Variant::Map& _map) const;
void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
PackageMap packages;
//
// Protected by userLock
//
ManagementObjectMap managementObjects;
//
// Protected by addLock
//
ManagementObjectVector newManagementObjects;
framing::Uuid uuid;
//
// Lock hierarchy: If a thread needs to take both addLock and userLock,
// it MUST take userLock first, then addLock.
//
sys::Mutex userLock;
sys::Mutex addLock;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
qpid::broker::Exchange::shared_ptr v2Topic;
qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
const qpid::sys::AbsTime startTime;
bool suppressed;
typedef std::pair<std::string,std::string> MethodName;
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
bool disallowAllV1Methods;
// Agent name and address
qpid::types::Variant::Map attrMap;
std::string name_address;
std::string vendorNameKey; // "." --> "_"
std::string productNameKey; // "." --> "_"
std::string instanceNameKey; // "." --> "_"
// supported management protocol
bool qmf1Support;
bool qmf2Support;
// Maximum # of objects allowed in a single V2 response
// message.
uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
// Indexed by a string composed of the object's package and class name.
// Protected by userLock.
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
framing::ResizableBuffer msgBuffer;
void writeData ();
void periodicProcessing (void);
void deleteObjectNowLH(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendBufferLH(framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey);
void sendBufferLH(framing::Buffer& buf,
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
void sendBufferLH(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
const std::string& content_type,
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
void sendBufferLH(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
const std::string& content_type,
const std::string& exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
void moveNewObjectsLH();
bool moveDeletedObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
PackageMap::iterator pIter,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void encodeClassIndication (framing::Buffer& buf,
const std::string packageName,
const struct SchemaClassKey key,
uint8_t kind);
bool bankInUse (uint32_t bank);
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
uint32_t code = 0, const std::string& text = "OK");
void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
ManagementObjectMap::iterator numericFind(const ObjectId& oid);
std::string summarizeAgents();
void debugSnapshot(const char* title);
};
void setManagementExecutionContext(const qpid::broker::ConnectionState*);
const qpid::broker::ConnectionState* getManagementExecutionContext();
}}
#endif /*!_ManagementAgent_*/