| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "unit_test.h" |
| #include "MessagingFixture.h" |
| #include "qpid/management/Buffer.h" |
| #include "qpid/management/ManagementAgent.h" |
| #include "qpid/messaging/Message.h" |
| #include "qpid/amqp_0_10/Codecs.h" |
| #include "qpid/log/Logger.h" |
| #include "qpid/log/Options.h" |
| |
| #include "qmf/org/apache/qpid/broker/mgmt/test/TestObject.h" |
| |
| #include <iomanip> |
| |
| |
| using qpid::management::Mutex; |
| using qpid::management::Manageable; |
| using qpid::management::Buffer; |
| using namespace qpid::messaging; |
| using namespace qpid::types; |
| |
| |
| |
| namespace qpid { |
| namespace tests { |
| |
| namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test; |
| namespace { |
| |
| typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr; |
| typedef std::vector<TestObjectPtr> TestObjectVector; |
| |
| // Instantiates a broker and its internal management agent. Provides |
| // factories for constructing Receivers for object indication messages. |
| // |
| class AgentFixture |
| { |
| MessagingFixture *mFix; |
| |
| public: |
| AgentFixture( unsigned int pubInterval=10, |
| bool qmfV2=false, |
| qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) |
| { |
| opts.enableMgmt=true; |
| opts.qmf1Support=!qmfV2; |
| opts.qmf2Support=qmfV2; |
| opts.mgmtPubInterval=pubInterval; |
| mFix = new MessagingFixture(opts, true); |
| |
| _qmf::TestObject::registerSelf(getBrokerAgent()); |
| }; |
| ~AgentFixture() |
| { |
| delete mFix; |
| }; |
| ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); } |
| Receiver createV1DataIndRcvr( const std::string package, const std::string klass ) |
| { |
| return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " |
| "node: {type: queue, " |
| "x-bindings: [{exchange: qpid.management, " |
| "key: 'console.obj.1.0.") |
| + package + std::string(".") + klass |
| + std::string("'}]}}")); |
| }; |
| Receiver createV2DataIndRcvr( const std::string package, const std::string klass ) |
| { |
| std::string p(package); |
| std::replace(p.begin(), p.end(), '.', '_'); |
| std::string k(klass); |
| std::replace(k.begin(), k.end(), '.', '_'); |
| |
| return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " |
| "node: {type: queue, " |
| "x-bindings: [{exchange: qmf.default.topic, " |
| "key: 'agent.ind.data.") |
| + p + std::string(".") + k |
| + std::string("'}]}}")); |
| }; |
| }; |
| |
| |
| // A "management object" that supports the TestObject |
| // |
| class TestManageable : public qpid::management::Manageable |
| { |
| management::ManagementObject::shared_ptr mgmtObj; |
| const std::string key; |
| public: |
| TestManageable(management::ManagementAgent *agent, std::string _key) |
| : key(_key) |
| { |
| _qmf::TestObject::shared_ptr tmp(new _qmf::TestObject(agent, this)); |
| |
| // seed it with some default values... |
| tmp->set_string1(key); |
| tmp->set_bool1(true); |
| qpid::types::Variant::Map vMap; |
| vMap["one"] = qpid::types::Variant(1); |
| vMap["two"] = qpid::types::Variant("two"); |
| vMap["three"] = qpid::types::Variant("whatever"); |
| tmp->set_map1(vMap); |
| |
| mgmtObj = tmp; |
| }; |
| ~TestManageable() { mgmtObj.reset(); } |
| management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; }; |
| static void validateTestObjectProperties(_qmf::TestObject& to) |
| { |
| // verify the default values are as expected. We don't check 'string1', |
| // as it is the object key, and is unique for each object (no default value). |
| BOOST_CHECK(to.get_bool1() == true); |
| BOOST_CHECK(to.get_map1().size() == 3); |
| qpid::types::Variant::Map mappy = to.get_map1(); |
| BOOST_CHECK(1 == (unsigned int)mappy["one"]); |
| BOOST_CHECK(mappy["two"].asString() == std::string("two")); |
| BOOST_CHECK(mappy["three"].asString() == std::string("whatever")); |
| }; |
| }; |
| |
| |
| // decode a V1 Content Indication message |
| // |
| void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen) |
| { |
| const size_t MAX_BUFFER_SIZE=65536; |
| char tmp[MAX_BUFFER_SIZE]; |
| |
| objs.clear(); |
| |
| BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE); |
| |
| ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size()); |
| Buffer buf(tmp, inMsg.getContent().size()); |
| |
| while (buf.available() > 8) { // 8 == qmf v1 header size |
| BOOST_CHECK_EQUAL(buf.getOctet(), 'A'); |
| BOOST_CHECK_EQUAL(buf.getOctet(), 'M'); |
| BOOST_CHECK_EQUAL(buf.getOctet(), '2'); |
| BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication |
| // @@todo: kag: how do we skip 'i' entries??? |
| buf.getLong(); // ignore sequence |
| |
| std::string str1; // decode content body as string |
| buf.getRawData(str1, objLen); |
| |
| TestObjectPtr fake(new _qmf::TestObject(0,0)); |
| fake->readProperties( str1 ); |
| objs.push_back(fake); |
| } |
| } |
| |
| |
| // decode a V2 Content Indication message |
| // |
| void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs) |
| { |
| objs.clear(); |
| |
| BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list")); |
| |
| const ::qpid::types::Variant::Map& m = inMsg.getProperties(); |
| Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode")); |
| BOOST_CHECK(iter != m.end()); |
| BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication")); |
| |
| Variant::List vList; |
| ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList); |
| |
| for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) { |
| TestObjectPtr fake(new _qmf::TestObject(0,0)); |
| fake->readTimestamps(lIter->asMap()); |
| fake->mapDecodeValues((lIter->asMap())["_values"].asMap()); |
| objs.push_back(fake); |
| } |
| } |
| } |
| |
| QPID_AUTO_TEST_SUITE(BrokerMgmtAgent) |
| |
| // verify that an object that is added to the broker's management database is |
| // published correctly. Furthermore, verify that it is published once after |
| // it has been deleted. |
| // |
| QPID_AUTO_TEST_CASE(v1ObjPublish) |
| { |
| AgentFixture* fix = new AgentFixture(3); |
| management::ManagementAgent* agent; |
| agent = fix->getBrokerAgent(); |
| |
| // create a manageable test object |
| TestManageable *tm = new TestManageable(agent, std::string("obj1")); |
| uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); |
| |
| Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); |
| |
| agent->addObject(tm->GetManagementObject(), 1); |
| |
| // wait for the object to be published |
| Message m1; |
| BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); |
| |
| TestObjectVector objs; |
| decodeV1ObjectUpdates(m1, objs, objLen); |
| BOOST_CHECK(objs.size() > 0); |
| |
| for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { |
| |
| TestManageable::validateTestObjectProperties(**oIter); |
| |
| qpid::types::Variant::Map mappy; |
| (*oIter)->writeTimestamps(mappy); |
| BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted |
| } |
| |
| // destroy the object |
| |
| tm->GetManagementObject()->resourceDestroy(); |
| |
| // wait for the deleted object to be published |
| |
| bool isDeleted = false; |
| while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { |
| |
| decodeV1ObjectUpdates(m1, objs, objLen); |
| BOOST_CHECK(objs.size() > 0); |
| |
| for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { |
| |
| TestManageable::validateTestObjectProperties(**oIter); |
| |
| qpid::types::Variant::Map mappy; |
| (*oIter)->writeTimestamps(mappy); |
| if (mappy["_delete_ts"].asUint64() != 0) |
| isDeleted = true; |
| } |
| } |
| |
| BOOST_CHECK(isDeleted); |
| |
| r1.close(); |
| delete fix; |
| delete tm; |
| } |
| |
| // Repeat the previous test, but with V2-based object support |
| // |
| QPID_AUTO_TEST_CASE(v2ObjPublish) |
| { |
| AgentFixture* fix = new AgentFixture(3, true); |
| management::ManagementAgent* agent; |
| agent = fix->getBrokerAgent(); |
| |
| TestManageable *tm = new TestManageable(agent, std::string("obj2")); |
| |
| Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); |
| |
| agent->addObject(tm->GetManagementObject(), "testobj-1"); |
| |
| // wait for the object to be published |
| Message m1; |
| BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); |
| |
| TestObjectVector objs; |
| decodeV2ObjectUpdates(m1, objs); |
| BOOST_CHECK(objs.size() > 0); |
| |
| for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { |
| |
| TestManageable::validateTestObjectProperties(**oIter); |
| |
| qpid::types::Variant::Map mappy; |
| (*oIter)->writeTimestamps(mappy); |
| BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); |
| } |
| |
| // destroy the object |
| |
| tm->GetManagementObject()->resourceDestroy(); |
| |
| // wait for the deleted object to be published |
| |
| bool isDeleted = false; |
| while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { |
| |
| decodeV2ObjectUpdates(m1, objs); |
| BOOST_CHECK(objs.size() > 0); |
| |
| for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { |
| |
| TestManageable::validateTestObjectProperties(**oIter); |
| |
| qpid::types::Variant::Map mappy; |
| (*oIter)->writeTimestamps(mappy); |
| if (mappy["_delete_ts"].asUint64() != 0) |
| isDeleted = true; |
| } |
| } |
| |
| BOOST_CHECK(isDeleted); |
| |
| r1.close(); |
| delete fix; |
| delete tm; |
| } |
| |
| // See QPID-2997 |
| QPID_AUTO_TEST_CASE(v2RapidRestoreObj) |
| { |
| AgentFixture* fix = new AgentFixture(3, true); |
| management::ManagementAgent* agent; |
| agent = fix->getBrokerAgent(); |
| |
| // two objects, same ObjID |
| TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); |
| TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); |
| |
| Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); |
| |
| // add, then immediately delete and re-add a copy of the object |
| agent->addObject(tm1->GetManagementObject(), "testobj-1"); |
| tm1->GetManagementObject()->resourceDestroy(); |
| agent->addObject(tm2->GetManagementObject(), "testobj-1"); |
| |
| // expect: a delete notification, then an update notification |
| TestObjectVector objs; |
| bool isDeleted = false; |
| bool isAdvertised = false; |
| size_t count = 0; |
| Message m1; |
| while (r1.fetch(m1, Duration::SECOND * 6)) { |
| |
| decodeV2ObjectUpdates(m1, objs); |
| BOOST_CHECK(objs.size() > 0); |
| |
| for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { |
| count++; |
| TestManageable::validateTestObjectProperties(**oIter); |
| |
| qpid::types::Variant::Map mappy; |
| (*oIter)->writeTimestamps(mappy); |
| if (mappy["_delete_ts"].asUint64() != 0) { |
| isDeleted = true; |
| BOOST_CHECK(isAdvertised == false); // delete must be first |
| } else { |
| isAdvertised = true; |
| BOOST_CHECK(isDeleted == true); // delete must be first |
| } |
| } |
| } |
| |
| BOOST_CHECK(isDeleted); |
| BOOST_CHECK(isAdvertised); |
| BOOST_CHECK(count == 2); |
| |
| r1.close(); |
| delete fix; |
| delete tm1; |
| delete tm2; |
| } |
| |
| QPID_AUTO_TEST_SUITE_END() |
| } |
| } |
| |
| |