blob: 5fc71979fd7ec8f9e570c7e2a7d983f902068ccc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qmf/engine/BrokerProxyImpl.h"
#include "qmf/engine/ConsoleImpl.h"
#include "qmf/engine/Protocol.h"
#include "qpid/Address.h"
#include "qpid/sys/SystemInfo.h"
#include <qpid/log/Statement.h>
#include <qpid/StringUtils.h>
#include <string.h>
#include <iostream>
#include <fstream>
using namespace std;
using namespace qmf::engine;
using namespace qpid::framing;
using namespace qpid::sys;
namespace {
const char* QMF_EXCHANGE = "qpid.management";
const char* DIR_EXCHANGE = "amq.direct";
const char* BROKER_KEY = "broker";
const char* BROKER_PACKAGE = "org.apache.qpid.broker";
const char* AGENT_CLASS = "agent";
const char* BROKER_AGENT_KEY = "agent.1.0";
}
const Object* QueryResponseImpl::getObject(uint32_t idx) const
{
vector<ObjectPtr>::const_iterator iter = results.begin();
while (idx > 0) {
if (iter == results.end())
return 0;
iter++;
idx--;
}
return iter->get();
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
BrokerEvent BrokerEventImpl::copy()
{
BrokerEvent item;
::memset(&item, 0, sizeof(BrokerEvent));
item.kind = kind;
STRING_REF(name);
STRING_REF(exchange);
STRING_REF(bindingKey);
item.context = context;
item.queryResponse = queryResponse.get();
item.methodResponse = methodResponse.get();
return item;
}
BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console)
{
stringstream qn;
qpid::Address addr;
SystemInfo::getLocalHostname(addr);
qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
queueName = qn.str();
seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
{
Mutex::ScopedLock _lock(lock);
agentList.clear();
eventQueue.clear();
xmtQueue.clear();
eventQueue.push_back(eventDeclareQueue(queueName));
eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
eventQueue.push_back(eventSetupComplete());
// TODO: Store session handle
}
void BrokerProxyImpl::sessionClosed()
{
Mutex::ScopedLock _lock(lock);
agentList.clear();
eventQueue.clear();
xmtQueue.clear();
}
void BrokerProxyImpl::startProtocol()
{
AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
{
Mutex::ScopedLock _lock(lock);
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
agentList[0] = agent;
requestsOutstanding = 1;
topicBound = false;
uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
}
console.impl->eventAgentAdded(agent);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
{
uint32_t length = buf.getPosition();
MessageImpl::Ptr message(new MessageImpl);
buf.reset();
buf.getRawData(message->body, length);
message->destination = destination;
message->routingKey = routingKey;
message->replyExchange = DIR_EXCHANGE;
message->replyKey = queueName;
xmtQueue.push_back(message);
}
void BrokerProxyImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
uint8_t opcode;
uint32_t sequence;
while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
{
Mutex::ScopedLock _lock(lock);
if (xmtQueue.empty())
return false;
item = xmtQueue.front()->copy();
return true;
}
void BrokerProxyImpl::popXmt()
{
Mutex::ScopedLock _lock(lock);
if (!xmtQueue.empty())
xmtQueue.pop_front();
}
bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
{
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
return false;
event = eventQueue.front()->copy();
return true;
}
void BrokerProxyImpl::popEvent()
{
Mutex::ScopedLock _lock(lock);
if (!eventQueue.empty())
eventQueue.pop_front();
}
uint32_t BrokerProxyImpl::agentCount() const
{
Mutex::ScopedLock _lock(lock);
return agentList.size();
}
const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
{
Mutex::ScopedLock _lock(lock);
for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++)
if (idx-- == 0)
return iter->second.get();
return 0;
}
void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
{
SequenceContext::Ptr queryContext(new QueryContext(*this, context));
Mutex::ScopedLock _lock(lock);
bool sent = false;
if (agent != 0) {
if (sendGetRequestLH(queryContext, query, agent))
sent = true;
} else {
// TODO (optimization) only send queries to agents that have the requested class+package
for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++) {
if (sendGetRequestLH(queryContext, query, iter->second.get()))
sent = true;
}
}
if (!sent) {
queryContext->reserve();
queryContext->release();
}
}
bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
{
if (query.impl->singleAgent()) {
if (query.impl->agentBank() != agent->getAgentBank())
return false;
}
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
agent->impl->addSequence(sequence);
Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
query.impl->encode(outBuffer);
key << "agent.1." << agent->impl->agentBank;
sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
return true;
}
string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
{
int argCount = schema->getArgumentCount();
if (argmap == 0 || !argmap->isMap())
return string("Arguments must be in a map value");
for (int aIdx = 0; aIdx < argCount; aIdx++) {
const SchemaArgument* arg(schema->getArgument(aIdx));
if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
if (argmap->keyInMap(arg->getName())) {
const Value* argVal(argmap->byKey(arg->getName()));
if (argVal->getType() != arg->getType())
return string("Argument is the wrong type: ") + arg->getName();
argVal->impl->encode(buffer);
} else {
Value defaultValue(arg->getType());
defaultValue.impl->encode(buffer);
}
}
}
return string();
}
string BrokerProxyImpl::encodedSizeMethodArguments(const SchemaMethod* schema, const Value* argmap, uint32_t& size)
{
int argCount = schema->getArgumentCount();
if (argmap == 0 || !argmap->isMap())
return string("Arguments must be in a map value");
for (int aIdx = 0; aIdx < argCount; aIdx++) {
const SchemaArgument* arg(schema->getArgument(aIdx));
if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
if (argmap->keyInMap(arg->getName())) {
const Value* argVal(argmap->byKey(arg->getName()));
if (argVal->getType() != arg->getType())
return string("Argument is the wrong type: ") + arg->getName();
size += argVal->impl->encodedSize();
} else {
Value defaultValue(arg->getType());
size += defaultValue.impl->encodedSize();
}
}
}
return string();
}
void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
const string& methodName, const Value* args, void* userContext)
{
int methodCount = cls->getMethodCount();
int idx;
for (idx = 0; idx < methodCount; idx++) {
const SchemaMethod* method = cls->getMethod(idx);
if (string(method->getName()) == methodName) {
Mutex::ScopedLock _lock(lock);
SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
stringstream key;
char* buf(outputBuffer);
uint32_t bufLen(1024);
bool allocated(false);
string argErrorString = encodedSizeMethodArguments(method, args, bufLen);
if (!argErrorString.empty()) {
MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
eventQueue.push_back(eventMethodResponse(userContext, argError));
return;
}
if (bufLen > MA_BUFFER_SIZE) {
buf = (char*) malloc(bufLen);
allocated = true;
}
Buffer outBuffer(buf, bufLen);
uint32_t sequence(seqMgr.reserve(methodContext));
Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
oid->impl->encode(outBuffer);
cls->getClassKey()->impl->encode(outBuffer);
outBuffer.putShortString(methodName);
encodeMethodArguments(method, args, outBuffer);
key << "agent.1." << oid->impl->getAgentBank();
sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
if (allocated)
free(buf);
return;
}
}
MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
Mutex::ScopedLock _lock(lock);
eventQueue.push_back(eventMethodResponse(userContext, error));
}
void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
{
Mutex::ScopedLock _lock(lock);
eventQueue.push_back(eventBind(exchange, queueName, key));
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
event->name = queueName;
return event;
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
event->name = queue;
event->exchange = exchange;
event->bindingKey = key;
return event;
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
return event;
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
{
QPID_LOG(trace, "Console Link to Broker Stable");
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
return event;
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
event->context = context;
event->queryResponse = response;
return event;
}
BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
event->context = context;
event->methodResponse = response;
return event;
}
void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
{
brokerId.decode(inBuffer);
QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve());
incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
}
void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
{
string package;
inBuffer.getShortString(package);
QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
console.impl->learnPackage(package);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve());
incOutstandingLH();
Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
outBuffer.putShortString(package);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
}
void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
{
string text;
uint32_t code = inBuffer.getLong();
inBuffer.getShortString(text);
QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
}
void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
uint8_t kind = inBuffer.getOctet();
auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
if (!console.impl->haveClass(classKey.get())) {
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
classKey->impl->encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
}
}
MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
{
MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
response->getException()->asString());
return response;
}
void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
{
vector<string> tokens = qpid::split(routingKey, ".");
uint32_t agentBank;
uint64_t timestamp;
if (routingKey.empty() || tokens.size() != 4)
agentBank = 0;
else
agentBank = ::atoi(tokens[3].c_str());
timestamp = inBuffer.getLongLong();
map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
if (iter != agentList.end()) {
console.impl->eventAgentHeartbeat(iter->second, timestamp);
}
QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
}
void BrokerProxyImpl::handleEventIndication(Buffer& inBuffer, uint32_t seq)
{
auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
const SchemaEventClass *schema = console.impl->getEventClass(classKey.get());
if (schema == 0) {
QPID_LOG(trace, "No Schema Found for EventIndication. seq=" << seq << " key=" << classKey->impl->str());
return;
}
EventPtr eptr(EventImpl::factory(schema, inBuffer));
console.impl->eventEventReceived(eptr);
QPID_LOG(trace, "RCVD EventIndication seq=" << seq << " key=" << classKey->impl->str());
}
void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
{
SchemaObjectClass* oClassPtr;
SchemaEventClass* eClassPtr;
uint8_t kind = inBuffer.getOctet();
const SchemaClassKey* key;
if (kind == CLASS_OBJECT) {
oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
console.impl->learnClass(oClassPtr);
key = oClassPtr->getClassKey();
QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
//
// If we have just learned about the org.apache.qpid.broker:agent class, send a get
// request for the current list of agents so we can have it on-hand before we declare
// this session "stable".
//
if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
FieldTable ft;
ft.setString("_class", AGENT_CLASS);
ft.setString("_package", BROKER_PACKAGE);
ft.encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
}
} else if (kind == CLASS_EVENT) {
eClassPtr = SchemaEventClassImpl::factory(inBuffer);
console.impl->learnClass(eClassPtr);
key = eClassPtr->getClassKey();
QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
}
else {
QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
}
}
ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
{
auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
if (schema == 0) {
QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
return ObjectPtr();
}
ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
//
// We've intercepted information about a remote agent... update the agent list accordingly
//
updateAgentList(optr);
}
return optr;
}
void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
{
Value* value = obj->getValue("agentBank");
Mutex::ScopedLock _lock(lock);
if (value != 0 && value->isUint()) {
uint32_t agentBank = value->asUint();
if (obj->isDeleted()) {
map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
if (iter != agentList.end()) {
AgentProxyPtr agent(iter->second);
console.impl->eventAgentDeleted(agent);
agentList.erase(agentBank);
QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
//
// Release all sequence numbers for requests in-flight to this agent.
// Since the agent is no longer connected, these requests would not
// otherwise complete.
//
agent->impl->releaseInFlight(seqMgr);
}
} else {
Value* str = obj->getValue("label");
string label;
if (str != 0 && str->isString())
label = str->asString();
map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
if (iter == agentList.end()) {
AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
agentList[agentBank] = agent;
console.impl->eventAgentAdded(agent);
QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
}
}
}
}
void BrokerProxyImpl::incOutstandingLH()
{
requestsOutstanding++;
}
void BrokerProxyImpl::decOutstanding()
{
Mutex::ScopedLock _lock(lock);
requestsOutstanding--;
if (requestsOutstanding == 0 && !topicBound) {
topicBound = true;
for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
iter != console.impl->bindingList.end(); iter++) {
string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
string key(iter->second);
eventQueue.push_back(eventBind(exchange, queueName, key));
}
eventQueue.push_back(eventStable());
}
}
MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
status(from.status), schema(from.schema)
{
if (from.exception.get())
exception.reset(new Value(*(from.exception)));
if (from.arguments.get())
arguments.reset(new Value(*(from.arguments)));
}
MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
{
string text;
status = buf.getLong();
buf.getMediumString(text);
exception.reset(new Value(TYPE_LSTR));
exception->setString(text.c_str());
if (status != 0)
return;
arguments.reset(new Value(TYPE_MAP));
int argCount(schema->getArgumentCount());
for (int idx = 0; idx < argCount; idx++) {
const SchemaArgument* arg = schema->getArgument(idx);
if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
Value* value(ValueImpl::factory(arg->getType(), buf));
arguments->insert(arg->getName(), value);
}
}
}
MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
{
status = s;
exception.reset(new Value(TYPE_LSTR));
exception->setString(text.c_str());
}
MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
{
MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
return new MethodResponse(impl);
}
MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
{
MethodResponseImpl* impl(new MethodResponseImpl(status, text));
return new MethodResponse(impl);
}
bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
{
ObjectPtr object;
bool completeContext = false;
if (opcode == Protocol::OP_BROKER_RESPONSE) {
broker.handleBrokerResponse(buffer, sequence);
completeContext = true;
}
else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
broker.handleCommandComplete(buffer, sequence);
completeContext = true;
}
else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
broker.handleSchemaResponse(buffer, sequence);
completeContext = true;
}
else if (opcode == Protocol::OP_PACKAGE_INDICATION)
broker.handlePackageIndication(buffer, sequence);
else if (opcode == Protocol::OP_CLASS_INDICATION)
broker.handleClassIndication(buffer, sequence);
else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
broker.handleHeartbeatIndication(buffer, sequence, routingKey);
else if (opcode == Protocol::OP_EVENT_INDICATION)
broker.handleEventIndication(buffer, sequence);
else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, false);
broker.console.impl->eventObjectUpdate(object, true, false);
}
else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, false, true);
broker.console.impl->eventObjectUpdate(object, false, true);
}
else if (opcode == Protocol::OP_OBJECT_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, true);
broker.console.impl->eventObjectUpdate(object, true, true);
}
else {
QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
completeContext = true;
}
return completeContext;
}
void QueryContext::reserve()
{
Mutex::ScopedLock _lock(lock);
requestsOutstanding++;
}
void QueryContext::release()
{
{
Mutex::ScopedLock _lock(lock);
if (--requestsOutstanding > 0)
return;
}
Mutex::ScopedLock _block(broker.lock);
broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
}
bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
bool completeContext = false;
ObjectPtr object;
if (opcode == Protocol::OP_COMMAND_COMPLETE) {
broker.handleCommandComplete(buffer, sequence);
completeContext = true;
//
// Visit each agent and remove the sequence from that agent's in-flight list.
// This could be made more efficient because only one agent will have this sequence
// in its list.
//
map<uint32_t, AgentProxyPtr> copy;
{
Mutex::ScopedLock _block(broker.lock);
copy = broker.agentList;
}
for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
iter->second->impl->delSequence(sequence);
}
else if (opcode == Protocol::OP_OBJECT_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, true);
if (object.get() != 0)
queryResponse->impl->results.push_back(object);
}
else {
QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
completeContext = true;
}
return completeContext;
}
void MethodContext::release()
{
Mutex::ScopedLock _block(broker.lock);
broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
}
bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
if (opcode == Protocol::OP_METHOD_RESPONSE)
methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
else
QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
return true;
}
//==================================================================
// Wrappers
//==================================================================
AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {}
AgentProxy::~AgentProxy() { delete impl; }
const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }
uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
void BrokerProxy::startProtocol() { impl->startProtocol(); }
void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
MethodResponse::~MethodResponse() {}
uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
const Value* MethodResponse::getException() const { return impl->getException(); }
const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
QueryResponse::~QueryResponse() {}
uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
const Value* QueryResponse::getException() const { return impl->getException(); }
uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }