blob: 53eb690ba80289f61f6ad55944269147f09493c2 [file] [log] [blame]
#ifndef _qpid_agent_ManagementAgentImpl_
#define _qpid_agent_ManagementAgentImpl_
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
#include "ManagementAgent.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Session.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/Message.h"
#include "qpid/client/MessageListener.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
#include <deque>
namespace qpid {
namespace management {
class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
{
public:
ManagementAgentImpl();
virtual ~ManagementAgentImpl();
//
// Methods from ManagementAgent
//
int getMaxThreads() { return 1; }
void init(const std::string& brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
const std::string& storeFile = "",
const std::string& uid = "guest",
const std::string& pwd = "guest",
const std::string& mech = "PLAIN",
const std::string& proto = "tcp");
void init(const client::ConnectionSettings& settings,
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
const std::string& storeFile = "");
bool isConnected() { return connected; }
std::string& getLastFailure() { return lastFailure; }
void registerClass(const std::string& packageName,
const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
void registerEvent(const std::string& packageName,
const std::string& eventName,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
uint32_t pollCallbacks(uint32_t callLimit = 0);
int getSignalFd();
uint16_t getInterval() { return interval; }
void periodicProcessing();
private:
struct SchemaClassKey {
std::string name;
uint8_t hash[16];
};
struct SchemaClassKeyComp {
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
else
for (int i = 0; i < 16; i++)
if (lhs.hash[i] != rhs.hash[i])
return lhs.hash[i] < rhs.hash[i];
return false;
}
};
struct SchemaClass {
management::ManagementObject::writeSchemaCall_t writeSchemaCall;
uint8_t kind;
SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
};
struct QueuedMethod {
QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
sequence(_seq), replyTo(_reply), body(_body) {}
uint32_t sequence;
std::string replyTo;
std::string body;
};
typedef std::deque<QueuedMethod*> MethodQueue;
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
PackageMap packages;
AgentAttachment attachment;
management::ManagementObjectMap managementObjects;
management::ManagementObjectMap newManagementObjects;
MethodQueue methodQueue;
void received (client::Message& msg);
uint16_t interval;
bool extThread;
int writeFd;
int readFd;
uint64_t nextObjectId;
std::string storeFile;
sys::Mutex agentLock;
sys::Mutex addLock;
framing::Uuid systemId;
client::ConnectionSettings connectionSettings;
bool initialized;
bool connected;
std::string lastFailure;
bool clientWasAdded;
uint32_t requestedBrokerBank;
uint32_t requestedAgentBank;
uint32_t assignedBrokerBank;
uint32_t assignedAgentBank;
uint16_t bootSequence;
static const uint8_t DEBUG_OFF = 0;
static const uint8_t DEBUG_CONN = 1;
static const uint8_t DEBUG_PROTO = 2;
static const uint8_t DEBUG_PUBLISH = 3;
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
friend class ConnectionThread;
class ConnectionThread : public sys::Runnable
{
bool operational;
ManagementAgentImpl& agent;
framing::Uuid sessionId;
client::Connection connection;
client::Session session;
client::SubscriptionManager* subscriptions;
std::stringstream queueName;
mutable sys::Mutex connLock;
bool shutdown;
bool sleeping;
void run();
public:
ConnectionThread(ManagementAgentImpl& _agent) :
operational(false), agent(_agent), subscriptions(0),
shutdown(false), sleeping(false) {}
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
void close();
bool isSleeping() const;
};
class PublishThread : public sys::Runnable
{
ManagementAgentImpl& agent;
void run();
public:
PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
};
ConnectionThread connThreadBody;
sys::Thread connThread;
PublishThread pubThreadBody;
sys::Thread pubThread;
static const std::string storeMagicNumber;
void startProtocol();
void storeData(bool requested=false);
void retrieveData();
PackageMap::iterator findOrAddPackage(const std::string& name);
void moveNewObjectsLH();
void addClassLocal (uint8_t classKind,
PackageMap::iterator pIter,
const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
};
}}
#endif /*!_qpid_agent_ManagementAgentImpl_*/