| |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| #include "qpid/management/Manageable.h" |
| #include "qpid/management/ManagementObject.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/agent/ManagementAgentImpl.h" |
| #include "qpid/amqp_0_10/Codecs.h" |
| #include <list> |
| #include <string.h> |
| #include <stdlib.h> |
| #include <sys/types.h> |
| #include <iostream> |
| #include <fstream> |
| #include <boost/lexical_cast.hpp> |
| |
| using namespace qpid::client; |
| using namespace qpid::framing; |
| using namespace qpid::management; |
| using namespace qpid::sys; |
| using namespace std; |
| using std::stringstream; |
| using std::ofstream; |
| using std::ifstream; |
| using std::string; |
| using std::endl; |
| using qpid::types::Variant; |
| using qpid::amqp_0_10::MapCodec; |
| using qpid::amqp_0_10::ListCodec; |
| |
| namespace { |
| qpid::sys::Mutex lock; |
| bool disabled = false; |
| ManagementAgent* agent = 0; |
| int refCount = 0; |
| |
| const string defaultVendorName("vendor"); |
| const string defaultProductName("product"); |
| |
| // Create a valid binding key substring by |
| // replacing all '.' chars with '_' |
| const string keyifyNameStr(const string& name) |
| { |
| string n2 = name; |
| |
| size_t pos = n2.find('.'); |
| while (pos != n2.npos) { |
| n2.replace(pos, 1, "_"); |
| pos = n2.find('.', pos); |
| } |
| return n2; |
| } |
| } |
| |
| ManagementAgent::Singleton::Singleton(bool disableManagement) |
| { |
| sys::Mutex::ScopedLock _lock(lock); |
| if (disableManagement && !disabled) { |
| disabled = true; |
| assert(refCount == 0); // can't disable after agent has been allocated |
| } |
| if (refCount == 0 && !disabled) |
| agent = new ManagementAgentImpl(); |
| refCount++; |
| } |
| |
| ManagementAgent::Singleton::~Singleton() |
| { |
| sys::Mutex::ScopedLock _lock(lock); |
| refCount--; |
| if (refCount == 0 && !disabled) { |
| delete agent; |
| agent = 0; |
| } |
| } |
| |
| ManagementAgent* ManagementAgent::Singleton::getInstance() |
| { |
| return agent; |
| } |
| |
| const string ManagementAgentImpl::storeMagicNumber("MA02"); |
| |
| ManagementAgentImpl::ManagementAgentImpl() : |
| interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), |
| notifyable(0), inCallback(false), |
| initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), |
| topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"), |
| schemaTimestamp(Duration(EPOCH, now())), |
| publishAllData(true), requestedBrokerBank(0), requestedAgentBank(0), |
| assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), |
| maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter |
| connThreadBody(*this), connThread(connThreadBody), |
| pubThreadBody(*this), pubThread(pubThreadBody) |
| { |
| } |
| |
| ManagementAgentImpl::~ManagementAgentImpl() |
| { |
| // shutdown & cleanup all threads |
| connThreadBody.close(); |
| pubThreadBody.close(); |
| |
| connThread.join(); |
| pubThread.join(); |
| |
| if (pipeHandle) { |
| delete pipeHandle; |
| pipeHandle = 0; |
| } |
| } |
| |
| void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) |
| { |
| if (vendor.find(':') != vendor.npos) { |
| throw Exception("vendor string cannot contain a ':' character."); |
| } |
| if (product.find(':') != product.npos) { |
| throw Exception("product string cannot contain a ':' character."); |
| } |
| |
| attrMap["_vendor"] = vendor; |
| attrMap["_product"] = product; |
| if (!instance.empty()) { |
| attrMap["_instance"] = instance; |
| } |
| } |
| |
| |
| void ManagementAgentImpl::getName(string& vendor, string& product, string& instance) |
| { |
| vendor = std::string(attrMap["_vendor"]); |
| product = std::string(attrMap["_product"]); |
| instance = std::string(attrMap["_instance"]); |
| } |
| |
| |
| const std::string& ManagementAgentImpl::getAddress() |
| { |
| return name_address; |
| } |
| |
| |
| void ManagementAgentImpl::init(const string& brokerHost, |
| uint16_t brokerPort, |
| uint16_t intervalSeconds, |
| bool useExternalThread, |
| const string& _storeFile, |
| const string& uid, |
| const string& pwd, |
| const string& mech, |
| const string& proto) |
| { |
| management::ConnectionSettings settings; |
| settings.protocol = proto; |
| settings.host = brokerHost; |
| settings.port = brokerPort; |
| settings.username = uid; |
| settings.password = pwd; |
| settings.mechanism = mech; |
| settings.heartbeat = 10; |
| init(settings, intervalSeconds, useExternalThread, _storeFile); |
| } |
| |
| void ManagementAgentImpl::init(const qpid::management::ConnectionSettings& settings, |
| uint16_t intervalSeconds, |
| bool useExternalThread, |
| const string& _storeFile) |
| { |
| std::string cfgVendor, cfgProduct, cfgInstance; |
| |
| interval = intervalSeconds; |
| extThread = useExternalThread; |
| storeFile = _storeFile; |
| nextObjectId = 1; |
| |
| // |
| // Convert from management::ConnectionSettings to client::ConnectionSettings |
| // |
| connectionSettings.protocol = settings.protocol; |
| connectionSettings.host = settings.host; |
| connectionSettings.port = settings.port; |
| connectionSettings.virtualhost = settings.virtualhost; |
| connectionSettings.username = settings.username; |
| connectionSettings.password = settings.password; |
| connectionSettings.mechanism = settings.mechanism; |
| connectionSettings.locale = settings.locale; |
| connectionSettings.heartbeat = settings.heartbeat; |
| connectionSettings.maxChannels = settings.maxChannels; |
| connectionSettings.maxFrameSize = settings.maxFrameSize; |
| connectionSettings.bounds = settings.bounds; |
| connectionSettings.tcpNoDelay = settings.tcpNoDelay; |
| connectionSettings.service = settings.service; |
| connectionSettings.minSsf = settings.minSsf; |
| connectionSettings.maxSsf = settings.maxSsf; |
| |
| retrieveData(cfgVendor, cfgProduct, cfgInstance); |
| |
| bootSequence++; |
| if ((bootSequence & 0xF000) != 0) |
| bootSequence = 1; |
| |
| // setup the agent's name. The name may be set via a call to setName(). If setName() |
| // has not been called, the name can be read from the configuration file. If there is |
| // no name in the configuration file, a unique default name is provided. |
| if (attrMap.empty()) { |
| // setName() never called by application, so use names retrieved from config, otherwise defaults. |
| setName(cfgVendor.empty() ? defaultVendorName : cfgVendor, |
| cfgProduct.empty() ? defaultProductName : cfgProduct, |
| cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance); |
| } else if (attrMap.find("_instance") == attrMap.end()) { |
| // setName() called, but instance was not specified, use config or generate a uuid |
| setName(attrMap["_vendor"].asString(), attrMap["_product"].asString(), |
| cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance); |
| } |
| |
| name_address = attrMap["_vendor"].asString() + ":" + attrMap["_product"].asString() + ":" + attrMap["_instance"].asString(); |
| vendorNameKey = keyifyNameStr(attrMap["_vendor"].asString()); |
| productNameKey = keyifyNameStr(attrMap["_product"].asString()); |
| instanceNameKey = keyifyNameStr(attrMap["_instance"].asString()); |
| attrMap["_name"] = name_address; |
| |
| storeData(true); |
| |
| QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port << |
| " interval=" << intervalSeconds << " storeFile=" << _storeFile << " name=" << name_address); |
| |
| initialized = true; |
| } |
| |
| void ManagementAgentImpl::registerClass(const string& packageName, |
| const string& className, |
| uint8_t* md5Sum, |
| ManagementObject::writeSchemaCall_t schemaCall) |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| PackageMap::iterator pIter = findOrAddPackage(packageName); |
| addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); |
| } |
| |
| void ManagementAgentImpl::registerEvent(const string& packageName, |
| const string& eventName, |
| uint8_t* md5Sum, |
| ManagementObject::writeSchemaCall_t schemaCall) |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| PackageMap::iterator pIter = findOrAddPackage(packageName); |
| addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); |
| } |
| |
| // old-style add object: 64bit id - deprecated |
| ObjectId ManagementAgentImpl::addObject(ManagementObject* object, |
| uint64_t persistId) |
| { |
| std::string key; |
| if (persistId) { |
| key = boost::lexical_cast<std::string>(persistId); |
| } |
| return addObject(object, key, persistId != 0); |
| } |
| |
| |
| // new style add object - use this approach! |
| ObjectId ManagementAgentImpl::addObject(ManagementObject* object, |
| const std::string& key, |
| bool persistent) |
| { |
| sys::Mutex::ScopedLock lock(addLock); |
| |
| uint16_t sequence = persistent ? 0 : bootSequence; |
| |
| ObjectId objectId(&attachment, 0, sequence); |
| if (key.empty()) |
| objectId.setV2Key(*object); // let object generate the key |
| else |
| objectId.setV2Key(key); |
| objectId.setAgentName(name_address); |
| |
| object->setObjectId(objectId); |
| newManagementObjects[objectId] = boost::shared_ptr<ManagementObject>(object); |
| return objectId; |
| } |
| |
| |
| void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) |
| { |
| static const std::string severityStr[] = { |
| "emerg", "alert", "crit", "error", "warn", |
| "note", "info", "debug" |
| }; |
| string content; |
| stringstream key; |
| Variant::Map headers; |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); |
| uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; |
| |
| // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << |
| // event.getPackageName() << "." << event.getEventName(); |
| key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) |
| << "." << keyifyNameStr(event.getEventName()) |
| << "." << severityStr[sev] |
| << "." << vendorNameKey |
| << "." << productNameKey |
| << "." << instanceNameKey; |
| |
| Variant::Map map_; |
| Variant::Map schemaId; |
| Variant::Map values; |
| |
| map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), |
| event.getEventName(), |
| event.getMd5Sum(), |
| ManagementItem::CLASS_KIND_EVENT); |
| event.mapEncode(values); |
| map_["_values"] = values; |
| map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); |
| map_["_severity"] = sev; |
| |
| headers["method"] = "indication"; |
| headers["qmf.opcode"] = "_data_indication"; |
| headers["qmf.content"] = "_event"; |
| headers["qmf.agent"] = name_address; |
| |
| Variant::List list; |
| list.push_back(map_); |
| ListCodec::encode(list, content); |
| } |
| |
| connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list"); |
| } |
| |
| uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| |
| if (inCallback) { |
| QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!"); |
| return 0; |
| } |
| |
| for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { |
| if (methodQueue.empty()) |
| break; |
| |
| QueuedMethod* item = methodQueue.front(); |
| methodQueue.pop_front(); |
| { |
| sys::Mutex::ScopedUnlock unlock(agentLock); |
| invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId); |
| delete item; |
| } |
| } |
| |
| if (pipeHandle != 0) { |
| char rbuf[100]; |
| while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes |
| } |
| return methodQueue.size(); |
| } |
| |
| int ManagementAgentImpl::getSignalFd() |
| { |
| if (extThread) { |
| if (pipeHandle == 0) |
| pipeHandle = new PipeHandle(true); |
| return pipeHandle->getReadHandle(); |
| } |
| |
| return -1; |
| } |
| |
| void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context) |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| notifyCallback = callback; |
| notifyContext = context; |
| } |
| |
| void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| notifyable = &_notifyable; |
| } |
| |
| void ManagementAgentImpl::startProtocol() |
| { |
| sendHeartbeat(); |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| publishAllData = true; |
| } |
| } |
| |
| void ManagementAgentImpl::storeData(bool requested) |
| { |
| if (!storeFile.empty()) { |
| ofstream outFile(storeFile.c_str()); |
| uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank; |
| uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank; |
| |
| if (outFile.good()) { |
| outFile << storeMagicNumber << " " << brokerBankToWrite << " " << |
| agentBankToWrite << " " << bootSequence << endl; |
| |
| if (attrMap.find("_vendor") != attrMap.end()) |
| outFile << "vendor=" << attrMap["_vendor"] << endl; |
| if (attrMap.find("_product") != attrMap.end()) |
| outFile << "product=" << attrMap["_product"] << endl; |
| if (attrMap.find("_instance") != attrMap.end()) |
| outFile << "instance=" << attrMap["_instance"] << endl; |
| |
| outFile.close(); |
| } |
| } |
| } |
| |
| void ManagementAgentImpl::retrieveData(std::string& vendor, std::string& product, std::string& inst) |
| { |
| vendor.clear(); |
| product.clear(); |
| inst.clear(); |
| |
| if (!storeFile.empty()) { |
| ifstream inFile(storeFile.c_str()); |
| string mn; |
| |
| if (inFile.good()) { |
| inFile >> mn; |
| if (mn == storeMagicNumber) { |
| std::string inText; |
| |
| inFile >> requestedBrokerBank; |
| inFile >> requestedAgentBank; |
| inFile >> bootSequence; |
| |
| while (inFile.good()) { |
| std::getline(inFile, inText); |
| if (!inText.compare(0, 7, "vendor=")) { |
| vendor = inText.substr(7); |
| QPID_LOG(debug, "read vendor name [" << vendor << "] from configuration file."); |
| } else if (!inText.compare(0, 8, "product=")) { |
| product = inText.substr(8); |
| QPID_LOG(debug, "read product name [" << product << "] from configuration file."); |
| } else if (!inText.compare(0, 9, "instance=")) { |
| inst = inText.substr(9); |
| QPID_LOG(debug, "read instance name [" << inst << "] from configuration file."); |
| } |
| } |
| } |
| inFile.close(); |
| } |
| } |
| } |
| |
| void ManagementAgentImpl::sendHeartbeat() |
| { |
| static const string addr_key_base("agent.ind.heartbeat."); |
| |
| Variant::Map map; |
| Variant::Map headers; |
| string content; |
| std::stringstream addr_key; |
| |
| addr_key << addr_key_base << vendorNameKey |
| << "." << productNameKey |
| << "." << instanceNameKey; |
| |
| headers["method"] = "indication"; |
| headers["qmf.opcode"] = "_agent_heartbeat_indication"; |
| headers["qmf.agent"] = name_address; |
| |
| getHeartbeatContent(map); |
| MapCodec::encode(map, content); |
| |
| // Set TTL (in msecs) on outgoing heartbeat indications based on the interval |
| // time to prevent stale heartbeats from getting to the consoles. |
| |
| connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), |
| "amqp/map", interval * 2 * 1000); |
| |
| QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); |
| } |
| |
| void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid, |
| const string& text, uint32_t code) |
| { |
| Variant::Map map; |
| Variant::Map headers; |
| Variant::Map values; |
| string content; |
| |
| headers["method"] = "indication"; |
| headers["qmf.opcode"] = "_exception"; |
| headers["qmf.agent"] = name_address; |
| |
| values["error_code"] = code; |
| values["error_text"] = text; |
| map["_values"] = values; |
| |
| MapCodec::encode(map, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk); |
| |
| QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); |
| } |
| |
| void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk) |
| { |
| string packageName; |
| SchemaClassKey key; |
| uint32_t outLen(0); |
| char localBuffer[MA_BUFFER_SIZE]; |
| Buffer outBuffer(localBuffer, MA_BUFFER_SIZE); |
| bool found(false); |
| |
| inBuffer.getShortString(packageName); |
| inBuffer.getShortString(key.name); |
| inBuffer.getBin128(key.hash); |
| |
| QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| PackageMap::iterator pIter = packages.find(packageName); |
| if (pIter != packages.end()) { |
| ClassMap& cMap = pIter->second; |
| ClassMap::iterator cIter = cMap.find(key); |
| if (cIter != cMap.end()) { |
| SchemaClass& schema = cIter->second; |
| string body; |
| |
| encodeHeader(outBuffer, 's', sequence); |
| schema.writeSchemaCall(body); |
| outBuffer.putRawData(body); |
| outLen = MA_BUFFER_SIZE - outBuffer.available(); |
| outBuffer.reset(); |
| found = true; |
| } |
| } |
| } |
| |
| if (found) { |
| connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk); |
| QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); |
| } |
| } |
| |
| void ManagementAgentImpl::handleConsoleAddedIndication() |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| publishAllData = true; |
| |
| QPID_LOG(trace, "RCVD ConsoleAddedInd"); |
| } |
| |
| void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) |
| { |
| string methodName; |
| bool failed = false; |
| Variant::Map inMap; |
| Variant::Map outMap; |
| Variant::Map::const_iterator oid, mid; |
| string content; |
| |
| MapCodec::decode(body, inMap); |
| |
| if ((oid = inMap.find("_object_id")) == inMap.end() || |
| (mid = inMap.find("_method_name")) == inMap.end()) { |
| sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), |
| Manageable::STATUS_PARAMETER_INVALID); |
| failed = true; |
| } else { |
| string methodName; |
| ObjectId objId; |
| Variant::Map inArgs; |
| Variant::Map callMap; |
| |
| try { |
| // conversions will throw if input is invalid. |
| objId = ObjectId(oid->second.asMap()); |
| methodName = mid->second.getString(); |
| |
| mid = inMap.find("_arguments"); |
| if (mid != inMap.end()) { |
| inArgs = (mid->second).asMap(); |
| } |
| |
| QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs); |
| |
| boost::shared_ptr<ManagementObject> oPtr; |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| ObjectMap::iterator iter = managementObjects.find(objId); |
| if (iter != managementObjects.end() && !iter->second->isDeleted()) |
| oPtr = iter->second; |
| } |
| |
| if (oPtr.get() == 0) { |
| sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT), |
| Manageable::STATUS_UNKNOWN_OBJECT); |
| failed = true; |
| } else { |
| oPtr->doMethod(methodName, inArgs, callMap, userId); |
| |
| if (callMap["_status_code"].asUint32() == 0) { |
| outMap["_arguments"] = Variant::Map(); |
| for (Variant::Map::const_iterator iter = callMap.begin(); |
| iter != callMap.end(); iter++) |
| if (iter->first != "_status_code" && iter->first != "_status_text") |
| outMap["_arguments"].asMap()[iter->first] = iter->second; |
| } else { |
| sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]); |
| failed = true; |
| } |
| } |
| |
| } catch(types::InvalidConversion& e) { |
| sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION); |
| failed = true; |
| } |
| } |
| |
| if (!failed) { |
| Variant::Map headers; |
| headers["method"] = "response"; |
| headers["qmf.agent"] = name_address; |
| headers["qmf.opcode"] = "_method_response"; |
| QPID_LOG(trace, "SENT MethodResponse map=" << outMap); |
| MapCodec::encode(outMap, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk); |
| } |
| } |
| |
| void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk) |
| { |
| moveNewObjectsLH(); |
| |
| Variant::Map inMap; |
| Variant::Map::const_iterator i; |
| Variant::Map headers; |
| |
| MapCodec::decode(body, inMap); |
| QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); |
| |
| headers["method"] = "response"; |
| headers["qmf.opcode"] = "_query_response"; |
| headers["qmf.agent"] = name_address; |
| headers["partial"] = Variant(); |
| |
| Variant::List list_; |
| Variant::Map map_; |
| Variant::Map values; |
| Variant::Map oidMap; |
| string content; |
| |
| /* |
| * Unpack the _what element of the query. Currently we only support OBJECT queries. |
| */ |
| i = inMap.find("_what"); |
| if (i == inMap.end()) { |
| sendException(rte, rtk, cid, "_what element missing in Query"); |
| return; |
| } |
| |
| if (i->second.getType() != qpid::types::VAR_STRING) { |
| sendException(rte, rtk, cid, "_what element is not a string"); |
| return; |
| } |
| |
| if (i->second.asString() == "OBJECT") { |
| headers["qmf.content"] = "_data"; |
| /* |
| * Unpack the _object_id element of the query if it is present. If it is present, find that one |
| * object and return it. If it is not present, send a class-based result. |
| */ |
| i = inMap.find("_object_id"); |
| if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { |
| ObjectId objId(i->second.asMap()); |
| boost::shared_ptr<ManagementObject> object; |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| ObjectMap::iterator iter = managementObjects.find(objId); |
| if (iter != managementObjects.end()) |
| object = iter->second; |
| } |
| |
| if (object.get() != 0) { |
| if (object->getConfigChanged() || object->getInstChanged()) |
| object->setUpdateTime(); |
| |
| object->mapEncodeValues(values, true, true); // write both stats and properties |
| objId.mapEncode(oidMap); |
| map_["_values"] = values; |
| map_["_object_id"] = oidMap; |
| object->writeTimestamps(map_); |
| map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), |
| object->getClassName(), |
| object->getMd5Sum()); |
| list_.push_back(map_); |
| headers.erase("partial"); |
| |
| ListCodec::encode(list_, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); |
| QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); |
| return; |
| } |
| } else { // match using schema_id, if supplied |
| |
| string className; |
| string packageName; |
| |
| i = inMap.find("_schema_id"); |
| if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { |
| const Variant::Map& schemaIdMap(i->second.asMap()); |
| |
| Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); |
| if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) |
| className = s_iter->second.asString(); |
| |
| s_iter = schemaIdMap.find("_package_name"); |
| if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) |
| packageName = s_iter->second.asString(); |
| |
| typedef list<boost::shared_ptr<ManagementObject> > StageList; |
| StageList staging; |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| for (ObjectMap::iterator iter = managementObjects.begin(); |
| iter != managementObjects.end(); |
| iter++) { |
| ManagementObject* object = iter->second.get(); |
| if (object->getClassName() == className && |
| (packageName.empty() || object->getPackageName() == packageName)) |
| staging.push_back(iter->second); |
| } |
| } |
| |
| unsigned int objCount = 0; |
| for (StageList::iterator iter = staging.begin(); iter != staging.end(); iter++) { |
| ManagementObject* object = iter->get(); |
| if (object->getClassName() == className && |
| (packageName.empty() || object->getPackageName() == packageName)) { |
| |
| values.clear(); |
| oidMap.clear(); |
| map_.clear(); |
| |
| if (object->getConfigChanged() || object->getInstChanged()) |
| object->setUpdateTime(); |
| |
| object->mapEncodeValues(values, true, true); // write both stats and properties |
| object->getObjectId().mapEncode(oidMap); |
| map_["_values"] = values; |
| map_["_object_id"] = oidMap; |
| object->writeTimestamps(map_); |
| map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), |
| object->getClassName(), |
| object->getMd5Sum()); |
| list_.push_back(map_); |
| |
| if (++objCount >= maxV2ReplyObjs) { |
| objCount = 0; |
| ListCodec::encode(list_, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); |
| QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk); |
| content.clear(); |
| list_.clear(); |
| } |
| } |
| } |
| } |
| } |
| |
| // Send last "non-partial" message to indicate CommandComplete |
| headers.erase("partial"); |
| ListCodec::encode(list_, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); |
| QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk); |
| |
| } else if (i->second.asString() == "SCHEMA_ID") { |
| headers["qmf.content"] = "_schema_id"; |
| /** |
| * @todo - support for a predicate. For now, send a list of all known schema class keys. |
| */ |
| for (PackageMap::iterator pIter = packages.begin(); |
| pIter != packages.end(); pIter++) { |
| for (ClassMap::iterator cIter = pIter->second.begin(); |
| cIter != pIter->second.end(); cIter++) { |
| |
| list_.push_back(mapEncodeSchemaId( pIter->first, |
| cIter->first.name, |
| cIter->first.hash, |
| cIter->second.kind )); |
| } |
| } |
| |
| headers.erase("partial"); |
| ListCodec::encode(list_, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); |
| QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk); |
| |
| } else { |
| // Unknown query target |
| sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); |
| } |
| } |
| |
| void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk) |
| { |
| QPID_LOG(trace, "RCVD AgentLocateRequest"); |
| |
| Variant::Map map; |
| Variant::Map headers; |
| string content; |
| |
| headers["method"] = "indication"; |
| headers["qmf.opcode"] = "_agent_locate_response"; |
| headers["qmf.agent"] = name_address; |
| |
| getHeartbeatContent(map); |
| MapCodec::encode(map, content); |
| connThreadBody.sendBuffer(content, cid, headers, rte, rtk); |
| |
| QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| publishAllData = true; |
| } |
| } |
| |
| void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) |
| { |
| if (extThread) { |
| sys::Mutex::ScopedLock lock(agentLock); |
| |
| methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId)); |
| if (pipeHandle != 0) { |
| pipeHandle->write("X", 1); |
| } else if (notifyable != 0) { |
| inCallback = true; |
| { |
| sys::Mutex::ScopedUnlock unlock(agentLock); |
| notifyable->notify(); |
| } |
| inCallback = false; |
| } else if (notifyCallback != 0) { |
| inCallback = true; |
| { |
| sys::Mutex::ScopedUnlock unlock(agentLock); |
| notifyCallback(notifyContext); |
| } |
| inCallback = false; |
| } |
| } else { |
| invokeMethodRequest(body, cid, rte, rtk, userId); |
| } |
| |
| QPID_LOG(trace, "RCVD MethodRequest"); |
| } |
| |
| void ManagementAgentImpl::received(Message& msg) |
| { |
| string replyToExchange; |
| string replyToKey; |
| framing::MessageProperties mp = msg.getMessageProperties(); |
| if (mp.hasReplyTo()) { |
| const framing::ReplyTo& rt = mp.getReplyTo(); |
| replyToExchange = rt.getExchange(); |
| replyToKey = rt.getRoutingKey(); |
| } |
| |
| string userId; |
| if (mp.hasUserId()) |
| userId = mp.getUserId(); |
| |
| if (mp.hasAppId() && mp.getAppId() == "qmf2") |
| { |
| string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); |
| string cid = msg.getMessageProperties().getCorrelationId(); |
| |
| if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey); |
| else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId); |
| else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey); |
| else { |
| QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); |
| } |
| return; |
| } |
| |
| // old preV2 binary messages |
| |
| uint32_t sequence; |
| string data = msg.getData(); |
| Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); |
| uint8_t opcode; |
| |
| |
| if (checkHeader(inBuffer, &opcode, &sequence)) |
| { |
| if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); |
| else if (opcode == 'x') handleConsoleAddedIndication(); |
| else |
| QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); |
| } |
| } |
| |
| |
| void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) |
| { |
| buf.putOctet('A'); |
| buf.putOctet('M'); |
| buf.putOctet('2'); |
| buf.putOctet(opcode); |
| buf.putLong (seq); |
| } |
| |
| Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, |
| const string& cname, |
| const uint8_t *md5Sum, |
| uint8_t type) |
| { |
| Variant::Map map_; |
| |
| map_["_package_name"] = pname; |
| map_["_class_name"] = cname; |
| map_["_hash"] = types::Uuid(md5Sum); |
| if (type == ManagementItem::CLASS_KIND_EVENT) |
| map_["_type"] = "_event"; |
| else |
| map_["_type"] = "_data"; |
| |
| return map_; |
| } |
| |
| |
| bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) |
| { |
| if (buf.getSize() < 8) |
| return false; |
| |
| uint8_t h1 = buf.getOctet(); |
| uint8_t h2 = buf.getOctet(); |
| uint8_t h3 = buf.getOctet(); |
| |
| *opcode = buf.getOctet(); |
| *seq = buf.getLong(); |
| |
| return h1 == 'A' && h2 == 'M' && h3 == '2'; |
| } |
| |
| ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name) |
| { |
| PackageMap::iterator pIter = packages.find(name); |
| if (pIter != packages.end()) |
| return pIter; |
| |
| // No such package found, create a new map entry. |
| pair<PackageMap::iterator, bool> result = |
| packages.insert(pair<string, ClassMap>(name, ClassMap())); |
| |
| return result.first; |
| } |
| |
| void ManagementAgentImpl::moveNewObjectsLH() |
| { |
| sys::Mutex::ScopedLock lock(addLock); |
| for (ObjectMap::iterator iter = newManagementObjects.begin(); |
| iter != newManagementObjects.end(); |
| iter++) |
| managementObjects[iter->first] = iter->second; |
| newManagementObjects.clear(); |
| } |
| |
| void ManagementAgentImpl::addClassLocal(uint8_t classKind, |
| PackageMap::iterator pIter, |
| const string& className, |
| uint8_t* md5Sum, |
| ManagementObject::writeSchemaCall_t schemaCall) |
| { |
| SchemaClassKey key; |
| ClassMap& cMap = pIter->second; |
| |
| key.name = className; |
| memcpy(&key.hash, md5Sum, 16); |
| |
| ClassMap::iterator cIter = cMap.find(key); |
| if (cIter != cMap.end()) |
| return; |
| |
| // No such class found, create a new class with local information. |
| cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); |
| schemaTimestamp = Duration(EPOCH, now()); |
| QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp)); |
| } |
| |
| void ManagementAgentImpl::encodePackageIndication(Buffer& buf, |
| PackageMap::iterator pIter) |
| { |
| buf.putShortString((*pIter).first); |
| |
| QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first); |
| } |
| |
| void ManagementAgentImpl::encodeClassIndication(Buffer& buf, |
| PackageMap::iterator pIter, |
| ClassMap::iterator cIter) |
| { |
| SchemaClassKey key = (*cIter).first; |
| |
| buf.putOctet((*cIter).second.kind); |
| buf.putShortString((*pIter).first); |
| buf.putShortString(key.name); |
| buf.putBin128(key.hash); |
| |
| QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name); |
| } |
| |
| struct MessageItem { |
| string content; |
| Variant::Map headers; |
| string key; |
| MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {} |
| }; |
| |
| void ManagementAgentImpl::periodicProcessing() |
| { |
| string addr_key_base = "agent.ind.data."; |
| list<ObjectId> deleteList; |
| list<boost::shared_ptr<MessageItem> > message_list; |
| |
| sendHeartbeat(); |
| |
| { |
| sys::Mutex::ScopedLock lock(agentLock); |
| |
| if (!connected) |
| return; |
| |
| moveNewObjectsLH(); |
| |
| // |
| // Clear the been-here flag on all objects in the map. |
| // |
| for (ObjectMap::iterator iter = managementObjects.begin(); |
| iter != managementObjects.end(); |
| iter++) { |
| ManagementObject* object = iter->second.get(); |
| object->setFlags(0); |
| if (publishAllData) { |
| object->setForcePublish(true); |
| } |
| } |
| |
| publishAllData = false; |
| |
| // |
| // Process the entire object map. |
| // |
| uint32_t v2Objs = 0; |
| |
| for (ObjectMap::iterator baseIter = managementObjects.begin(); |
| baseIter != managementObjects.end(); |
| baseIter++) { |
| ManagementObject* baseObject = baseIter->second.get(); |
| |
| // |
| // Skip until we find a base object requiring a sent message. |
| // |
| if (baseObject->getFlags() == 1 || |
| (!baseObject->getConfigChanged() && |
| !baseObject->getInstChanged() && |
| !baseObject->getForcePublish() && |
| !baseObject->isDeleted())) |
| continue; |
| |
| std::string packageName = baseObject->getPackageName(); |
| std::string className = baseObject->getClassName(); |
| |
| Variant::List list_; |
| std::stringstream addr_key; |
| Variant::Map headers; |
| |
| addr_key << addr_key_base; |
| addr_key << keyifyNameStr(packageName) |
| << "." << keyifyNameStr(className) |
| << "." << vendorNameKey |
| << "." << productNameKey |
| << "." << instanceNameKey; |
| |
| headers["method"] = "indication"; |
| headers["qmf.opcode"] = "_data_indication"; |
| headers["qmf.content"] = "_data"; |
| headers["qmf.agent"] = name_address; |
| |
| for (ObjectMap::iterator iter = baseIter; |
| iter != managementObjects.end(); |
| iter++) { |
| ManagementObject* object = iter->second.get(); |
| bool send_stats, send_props; |
| if (baseObject->isSameClass(*object) && object->getFlags() == 0) { |
| object->setFlags(1); |
| if (object->getConfigChanged() || object->getInstChanged()) |
| object->setUpdateTime(); |
| |
| send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); |
| send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); |
| |
| if (send_stats || send_props) { |
| Variant::Map map_; |
| Variant::Map values; |
| Variant::Map oid; |
| |
| object->getObjectId().mapEncode(oid); |
| map_["_object_id"] = oid; |
| map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), |
| object->getClassName(), |
| object->getMd5Sum()); |
| object->writeTimestamps(map_); |
| object->mapEncodeValues(values, send_props, send_stats); |
| map_["_values"] = values; |
| list_.push_back(map_); |
| |
| if (++v2Objs >= maxV2ReplyObjs) { |
| v2Objs = 0; |
| boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); |
| ListCodec::encode(list_, item->content); |
| message_list.push_back(item); |
| list_.clear(); |
| } |
| } |
| |
| if (object->isDeleted()) |
| deleteList.push_back(iter->first); |
| object->setForcePublish(false); |
| } |
| } |
| |
| if (!list_.empty()) { |
| boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); |
| ListCodec::encode(list_, item->content); |
| message_list.push_back(item); |
| } |
| } |
| |
| // Delete flagged objects |
| for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); |
| iter != deleteList.rend(); |
| iter++) |
| managementObjects.erase(*iter); |
| } |
| |
| while (!message_list.empty()) { |
| boost::shared_ptr<MessageItem> item(message_list.front()); |
| message_list.pop_front(); |
| connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list"); |
| QPID_LOG(trace, "SENT DataIndication"); |
| } |
| } |
| |
| |
| void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map) |
| { |
| map["_values"] = attrMap; |
| map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now())); |
| map["_values"].asMap()["_heartbeat_interval"] = interval; |
| map["_values"].asMap()["_epoch"] = bootSequence; |
| map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp); |
| } |
| |
| void ManagementAgentImpl::ConnectionThread::run() |
| { |
| static const int delayMin(1); |
| static const int delayMax(128); |
| static const int delayFactor(2); |
| int delay(delayMin); |
| string dest("qmfagent"); |
| ConnectionThread::shared_ptr tmp; |
| |
| sessionId.generate(); |
| queueName << "qmfagent-" << sessionId; |
| |
| while (true) { |
| try { |
| if (agent.initialized) { |
| QPID_LOG(debug, "QMF Agent attempting to connect to the broker..."); |
| connection.open(agent.connectionSettings); |
| session = connection.newSession(queueName.str()); |
| subscriptions.reset(new client::SubscriptionManager(session)); |
| |
| session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, |
| arg::exclusive=true); |
| session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), |
| arg::bindingKey=queueName.str()); |
| session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(), |
| arg::bindingKey=agent.name_address); |
| session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(), |
| arg::bindingKey="console.#"); |
| |
| subscriptions->subscribe(agent, queueName.str(), dest); |
| QPID_LOG(info, "Connection established with broker"); |
| { |
| sys::Mutex::ScopedLock _lock(connLock); |
| if (shutdown) |
| return; |
| operational = true; |
| agent.connected = true; |
| agent.startProtocol(); |
| try { |
| sys::Mutex::ScopedUnlock _unlock(connLock); |
| subscriptions->run(); |
| } catch (exception) {} |
| |
| QPID_LOG(warning, "Connection to the broker has been lost"); |
| |
| operational = false; |
| agent.connected = false; |
| tmp = subscriptions; |
| subscriptions.reset(); |
| } |
| tmp.reset(); // frees the subscription outside the lock |
| delay = delayMin; |
| connection.close(); |
| } |
| } catch (exception &e) { |
| if (delay < delayMax) |
| delay *= delayFactor; |
| QPID_LOG(debug, "Connection failed: exception=" << e.what()); |
| } |
| |
| { |
| // sleep for "delay" seconds, but peridically check if the |
| // agent is shutting down so we don't hang for up to delayMax |
| // seconds during agent shutdown |
| sys::Mutex::ScopedLock _lock(connLock); |
| if (shutdown) |
| return; |
| sleeping = true; |
| int totalSleep = 0; |
| do { |
| sys::Mutex::ScopedUnlock _unlock(connLock); |
| ::sleep(delayMin); |
| totalSleep += delayMin; |
| } while (totalSleep < delay && !shutdown); |
| sleeping = false; |
| if (shutdown) |
| return; |
| } |
| } |
| } |
| |
| ManagementAgentImpl::ConnectionThread::~ConnectionThread() |
| { |
| } |
| |
| void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, |
| uint32_t length, |
| const string& exchange, |
| const string& routingKey) |
| { |
| Message msg; |
| string data; |
| |
| buf.getRawData(data, length); |
| msg.setData(data); |
| sendMessage(msg, exchange, routingKey); |
| } |
| |
| |
| |
| void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, |
| const string& cid, |
| const Variant::Map headers, |
| const string& exchange, |
| const string& routingKey, |
| const string& contentType, |
| uint64_t ttl_msec) |
| { |
| Message msg; |
| Variant::Map::const_iterator i; |
| |
| if (!cid.empty()) |
| msg.getMessageProperties().setCorrelationId(cid); |
| |
| if (!contentType.empty()) |
| msg.getMessageProperties().setContentType(contentType); |
| |
| if (ttl_msec) |
| msg.getDeliveryProperties().setTtl(ttl_msec); |
| |
| for (i = headers.begin(); i != headers.end(); ++i) { |
| msg.getHeaders().setString(i->first, i->second.asString()); |
| } |
| |
| msg.setData(data); |
| sendMessage(msg, exchange, routingKey); |
| } |
| |
| |
| |
| |
| |
| void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, |
| const string& exchange, |
| const string& routingKey) |
| { |
| ConnectionThread::shared_ptr s; |
| { |
| sys::Mutex::ScopedLock _lock(connLock); |
| if (!operational) |
| return; |
| s = subscriptions; |
| } |
| |
| msg.getDeliveryProperties().setRoutingKey(routingKey); |
| msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); |
| msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); |
| msg.getMessageProperties().setAppId("qmf2"); |
| try { |
| session.messageTransfer(arg::content=msg, arg::destination=exchange); |
| } catch(exception& e) { |
| QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); |
| // Bounce the connection |
| if (s) |
| s->stop(); |
| } |
| } |
| |
| |
| |
| void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) |
| { |
| stringstream key; |
| key << "agent." << brokerBank << "." << agentBank; |
| session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), |
| arg::bindingKey=key.str()); |
| } |
| |
| void ManagementAgentImpl::ConnectionThread::close() |
| { |
| ConnectionThread::shared_ptr s; |
| { |
| sys::Mutex::ScopedLock _lock(connLock); |
| shutdown = true; |
| s = subscriptions; |
| } |
| if (s) |
| s->stop(); |
| } |
| |
| bool ManagementAgentImpl::ConnectionThread::isSleeping() const |
| { |
| sys::Mutex::ScopedLock _lock(connLock); |
| return sleeping; |
| } |
| |
| |
| void ManagementAgentImpl::PublishThread::run() |
| { |
| uint16_t totalSleep; |
| uint16_t sleepTime; |
| |
| while (!shutdown) { |
| agent.periodicProcessing(); |
| totalSleep = 0; |
| |
| // |
| // Calculate a sleep time that is no greater than 5 seconds and |
| // no less than 1 second. |
| // |
| sleepTime = agent.getInterval(); |
| if (sleepTime > 5) |
| sleepTime = 5; |
| else if (sleepTime == 0) |
| sleepTime = 1; |
| |
| while (totalSleep < agent.getInterval() && !shutdown) { |
| ::sleep(sleepTime); |
| totalSleep += sleepTime; |
| } |
| } |
| } |