| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| /**@file Tests for partial failure in a cluster. |
| * Partial failure means some nodes experience a failure while others do not. |
| * In this case the failed nodes must shut down. |
| */ |
| |
| #include "test_tools.h" |
| #include "unit_test.h" |
| #include "ClusterFixture.h" |
| #include <boost/assign.hpp> |
| #include <boost/algorithm/string.hpp> |
| #include <boost/bind.hpp> |
| |
| QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) |
| |
| using namespace std; |
| using namespace qpid; |
| using namespace qpid::cluster; |
| using namespace qpid::framing; |
| using namespace qpid::client; |
| using namespace qpid::client::arg; |
| using namespace boost::assign; |
| using broker::Broker; |
| using boost::shared_ptr; |
| |
| // Timeout for tests that wait for messages |
| const sys::Duration TIMEOUT=sys::TIME_SEC/4; |
| |
| static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } |
| |
| void updateArgs(ClusterFixture::Args& args, size_t index) { |
| ostringstream os; |
| os << "--test-store-name=s" << index; |
| args.push_back(os.str()); |
| args.push_back("--load-module=.libs/test_store.so"); |
| args.push_back("--auth=no"); |
| args.push_back("TMP_DATA_DIR"); |
| |
| // These tests generate errors deliberately, disable error logging unless a log env var is set. |
| if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) { |
| remove_if(args.begin(), args.end(), isLogOption); |
| args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs. |
| } |
| } |
| |
| Message pMessage(string data, string q) { |
| Message msg(data, q); |
| msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); |
| return msg; |
| } |
| |
| void queueAndSub(Client& c) { |
| c.session.queueDeclare(c.name, durable=true); |
| c.subs.subscribe(c.lq, c.name); |
| } |
| |
| // Verify normal cluster-wide errors. |
| QPID_AUTO_TEST_CASE(testNormalErrors) { |
| // FIXME aconway 2009-04-10: Would like to put a scope just around |
| // the statements expected to fail (in BOOST_CHECK_THROW) but that |
| // sproadically lets out messages, possibly because they're in |
| // Connection thread. |
| ScopedSuppressLogging allQuiet; |
| |
| ClusterFixture cluster(3, -1, updateArgs); |
| Client c0(cluster[0], "c0"); |
| Client c1(cluster[1], "c1"); |
| Client c2(cluster[2], "c2"); |
| |
| queueAndSub(c0); |
| c0.session.messageTransfer(content=Message("x", "c0")); |
| BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); |
| |
| // Session error. |
| BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); |
| c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. |
| |
| // Connection error, kill c1 on all members. |
| queueAndSub(c1); |
| BOOST_CHECK_THROW( |
| c1.session.messageTransfer( |
| content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), |
| ConnectionException); |
| c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. |
| |
| BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); |
| BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); |
| BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); |
| } |
| |
| |
| // Test errors after a new member joins to verify frame-sequence-numbers are ok in update. |
| QPID_AUTO_TEST_CASE(testErrorAfterJoin) { |
| ScopedSuppressLogging allQuiet; |
| |
| ClusterFixture cluster(1, -1, updateArgs); |
| Client c0(cluster[0]); |
| c0.session.queueDeclare("q", durable=true); |
| c0.session.messageTransfer(content=pMessage("a", "q")); |
| |
| // Kill the new guy |
| cluster.add(); |
| Client c1(cluster[1]); |
| c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); |
| BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); |
| BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); |
| |
| // Kill the old guy |
| cluster.add(); |
| Client c2(cluster[2]); |
| c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); |
| BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); |
| |
| BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); |
| } |
| |
| // Test that if one member fails and others do not, the failure leaves the cluster. |
| QPID_AUTO_TEST_CASE(testSinglePartialFailure) { |
| ScopedSuppressLogging allQuiet; |
| |
| ClusterFixture cluster(3, -1, updateArgs); |
| Client c0(cluster[0], "c0"); |
| Client c1(cluster[1], "c1"); |
| Client c2(cluster[2], "c2"); |
| |
| c0.session.queueDeclare("q", durable=true); |
| c0.session.messageTransfer(content=pMessage("a", "q")); |
| // Cause partial failure on c1 |
| c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); |
| BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); |
| |
| c0.session.messageTransfer(content=pMessage("b", "q")); |
| BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); |
| BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); |
| |
| // Cause partial failure on c2 |
| c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); |
| BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); |
| |
| c0.session.messageTransfer(content=pMessage("c", "q")); |
| BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); |
| BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); |
| } |
| |
| // Test multiple partial falures: 2 fail 2 pass |
| QPID_AUTO_TEST_CASE(testMultiPartialFailure) { |
| ScopedSuppressLogging allQuiet; |
| |
| ClusterFixture cluster(4, -1, updateArgs); |
| Client c0(cluster[0], "c0"); |
| Client c1(cluster[1], "c1"); |
| Client c2(cluster[2], "c2"); |
| Client c3(cluster[3], "c3"); |
| |
| c0.session.queueDeclare("q", durable=true); |
| c0.session.messageTransfer(content=pMessage("a", "q")); |
| |
| // Cause partial failure on c1, c2 |
| c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); |
| BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); |
| BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); |
| |
| c0.session.messageTransfer(content=pMessage("b", "q")); |
| c3.session.messageTransfer(content=pMessage("c", "q")); |
| BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); |
| BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); |
| } |
| |
| /** FIXME aconway 2009-04-10: |
| * The current approach to shutting down a process in test_store |
| * sometimes leads to assertion failures and errors in the shut-down |
| * process. Need a cleaner solution |
| */ |
| #if 0 |
| QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { |
| ScopedSuppressLogging allQuiet; |
| |
| ClusterFixture cluster(2, -1, updateArgs); |
| Client c0(cluster[0], "c0"); |
| Client c1(cluster[1], "c1"); |
| |
| c0.session.queueDeclare("q", durable=true); |
| c0.session.messageTransfer(content=pMessage("a", "q")); |
| |
| // Cause failure on member 0 and simultaneous crash on member 1. |
| BOOST_CHECK_THROW( |
| c0.session.messageTransfer( |
| content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), |
| ConnectionException); |
| cluster.wait(1); |
| |
| Client c00(cluster[0], "c00"); // Old connection is dead. |
| BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); |
| BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); |
| } |
| #endif |
| |
| |
| QPID_AUTO_TEST_SUITE_END() |