blob: 59dfb98596ff21964bfeb0889d4bafdcba7b0f2d [file] [log] [blame]
#ifndef _ManagementBroker_
#define _ManagementBroker_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/agent/ManagementAgent.h"
#include "ManagementObject.h"
#include "Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include <qpid/framing/AMQFrame.h>
namespace qpid {
namespace management {
class ManagementBroker : public ManagementAgent
{
private:
int threadPoolSize;
public:
ManagementBroker ();
virtual ~ManagementBroker ();
void configure (const std::string& dataDir, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
void registerClass (const std::string& packageName,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void registerEvent (const std::string& packageName,
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT);
void clientAdded (const std::string& routingKey);
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
const framing::Uuid& getUuid() const { return uuid; }
// Stubs for remote management agent calls
void init(const std::string&, uint16_t, uint16_t, bool,
const std::string&, const std::string&, const std::string&,
const std::string&, const std::string&) { assert(0); }
void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); }
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
private:
friend class ManagementAgent;
struct Periodic : public qpid::broker::TimerTask
{
ManagementBroker& broker;
Periodic (ManagementBroker& broker, uint32_t seconds);
virtual ~Periodic ();
void fire ();
};
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
struct RemoteAgent : public Manageable
{
uint32_t brokerBank;
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
};
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap;
typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
// SchemaClassKey -- Key elements for map lookups
// SchemaClassKeyComp -- Comparison class for SchemaClassKey
// SchemaClass -- Non-key elements for classes
//
struct SchemaClassKey
{
std::string name;
uint8_t hash[16];
};
struct SchemaClassKeyComp
{
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
else
for (int i = 0; i < 16; i++)
if (lhs.hash[i] != rhs.hash[i])
return lhs.hash[i] < rhs.hash[i];
return false;
}
};
struct SchemaClass
{
uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
uint32_t pendingSequence;
size_t bufferLen;
uint8_t* buffer;
SchemaClass(uint8_t _kind, uint32_t seq) :
kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
PackageMap packages;
ManagementObjectMap managementObjects;
ManagementObjectMap newManagementObjects;
static ManagementAgent* agent;
static bool enabled;
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
void writeData ();
void periodicProcessing (void);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendBuffer (framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
void dispatchAgentCommandLH(qpid::broker::Message& msg);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
PackageMap::iterator pIter,
const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
bool bankInUse (uint32_t bank);
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
};
}}
#endif /*!_ManagementBroker_*/