| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <sys/wait.h> |
| #include <sys/time.h> |
| #include <string.h> |
| #include <sys/types.h> |
| #include <signal.h> |
| |
| #include <string> |
| #include <iostream> |
| #include <sstream> |
| #include <vector> |
| |
| #include <boost/assign.hpp> |
| |
| #include "qpid/framing/Uuid.h" |
| |
| #include <ForkedBroker.h> |
| #include <qpid/client/Connection.h> |
| |
| |
| |
| |
| |
| using namespace std; |
| using boost::assign::list_of; |
| using namespace qpid::framing; |
| using namespace qpid::client; |
| |
| |
| namespace qpid { |
| namespace tests { |
| |
| vector<pid_t> pids; |
| |
| typedef vector<ForkedBroker *> brokerVector; |
| |
| typedef enum |
| { |
| NO_STATUS, |
| RUNNING, |
| COMPLETED |
| } |
| childStatus; |
| |
| |
| typedef enum |
| { |
| NO_TYPE, |
| DECLARING_CLIENT, |
| SENDING_CLIENT, |
| RECEIVING_CLIENT |
| } |
| childType; |
| |
| |
| ostream& operator<< ( ostream& os, const childType& ct ) { |
| switch ( ct ) { |
| case DECLARING_CLIENT: os << "Declaring Client"; break; |
| case SENDING_CLIENT: os << "Sending Client"; break; |
| case RECEIVING_CLIENT: os << "Receiving Client"; break; |
| default: os << "No Client"; break; |
| } |
| |
| return os; |
| } |
| |
| |
| |
| |
| struct child |
| { |
| child ( string & name, pid_t pid, childType type ) |
| : name(name), pid(pid), retval(-999), status(RUNNING), type(type) |
| { |
| gettimeofday ( & startTime, 0 ); |
| } |
| |
| |
| void |
| done ( int _retval ) |
| { |
| retval = _retval; |
| status = COMPLETED; |
| gettimeofday ( & stopTime, 0 ); |
| } |
| |
| |
| void |
| setType ( childType t ) |
| { |
| type = t; |
| } |
| |
| |
| string name; |
| pid_t pid; |
| int retval; |
| childStatus status; |
| childType type; |
| struct timeval startTime, |
| stopTime; |
| }; |
| |
| |
| |
| |
| struct children : public vector<child *> |
| { |
| |
| void |
| add ( string & name, pid_t pid, childType type ) |
| { |
| push_back ( new child ( name, pid, type ) ); |
| } |
| |
| |
| child * |
| get ( pid_t pid ) |
| { |
| vector<child *>::iterator i; |
| for ( i = begin(); i != end(); ++ i ) |
| if ( pid == (*i)->pid ) |
| return *i; |
| |
| return 0; |
| } |
| |
| |
| void |
| exited ( pid_t pid, int retval ) |
| { |
| child * kid = get ( pid ); |
| if(! kid) |
| { |
| if ( verbosity > 1 ) |
| { |
| cerr << "children::exited warning: Can't find child with pid " |
| << pid |
| << endl; |
| } |
| return; |
| } |
| |
| kid->done ( retval ); |
| } |
| |
| |
| int |
| unfinished ( ) |
| { |
| int count = 0; |
| |
| vector<child *>::iterator i; |
| for ( i = begin(); i != end(); ++ i ) |
| if ( COMPLETED != (*i)->status ) |
| ++ count; |
| |
| return count; |
| } |
| |
| |
| int |
| checkChildren ( ) |
| { |
| for ( unsigned int i = 0; i < pids.size(); ++ i ) |
| { |
| int pid = pids[i]; |
| int returned_pid; |
| int status; |
| |
| child * kid = get ( pid ); |
| |
| if ( kid->status != COMPLETED ) |
| { |
| returned_pid = waitpid ( pid, &status, WNOHANG ); |
| |
| if ( returned_pid == pid ) |
| { |
| int exit_status = WEXITSTATUS(status); |
| exited ( pid, exit_status ); |
| if ( exit_status ) // this is a child error. |
| return exit_status; |
| } |
| } |
| } |
| |
| return 0; |
| } |
| |
| |
| void |
| killEverybody ( ) |
| { |
| vector<child *>::iterator i; |
| for ( i = begin(); i != end(); ++ i ) |
| kill ( (*i)->pid, 9 ); |
| } |
| |
| |
| |
| void |
| print ( ) |
| { |
| cout << "--- status of all children --------------\n"; |
| vector<child *>::iterator i; |
| for ( i = begin(); i != end(); ++ i ) |
| cout << "child: " << (*i)->name |
| << " status: " << (*i)->status |
| << endl; |
| cout << "\n\n\n\n"; |
| } |
| |
| int verbosity; |
| }; |
| |
| |
| children allMyChildren; |
| |
| |
| void |
| childExit ( int ) |
| { |
| int childReturnCode; |
| pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG); |
| |
| if ( pid > 0 ) |
| allMyChildren.exited ( pid, childReturnCode ); |
| } |
| |
| |
| |
| int |
| mrand ( int maxDesiredVal ) { |
| double zeroToOne = (double) rand() / (double) RAND_MAX; |
| return (int) (zeroToOne * (double) maxDesiredVal); |
| } |
| |
| |
| |
| int |
| mrand ( int minDesiredVal, int maxDesiredVal ) { |
| int interval = maxDesiredVal - minDesiredVal; |
| return minDesiredVal + mrand ( interval ); |
| } |
| |
| |
| |
| void |
| makeClusterName ( string & s ) { |
| stringstream ss; |
| ss << "soakTestCluster_" << Uuid(true).str(); |
| s = ss.str(); |
| } |
| |
| |
| |
| |
| |
| void |
| printBrokers ( brokerVector & brokers ) |
| { |
| cout << "Broker List ------------ size: " << brokers.size() << "\n"; |
| for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) { |
| cout << "pid: " |
| << (*i)->getPID() |
| << " port: " |
| << (*i)->getPort() |
| << endl; |
| } |
| cout << "end Broker List ------------\n"; |
| } |
| |
| |
| |
| |
| ForkedBroker * newbie = 0; |
| int newbie_port = 0; |
| |
| |
| |
| bool |
| wait_for_newbie ( ) |
| { |
| if ( ! newbie ) |
| return true; |
| |
| try |
| { |
| Connection connection; |
| connection.open ( "127.0.0.1", newbie_port ); |
| connection.close(); |
| newbie = 0; // He's no newbie anymore! |
| return true; |
| } |
| catch ( const std::exception& error ) |
| { |
| std::cerr << "wait_for_newbie error: " |
| << error.what() |
| << endl; |
| return false; |
| } |
| } |
| |
| bool endsWith(const char* str, const char* suffix) { |
| return (strlen(suffix) < strlen(str) && 0 == strcmp(str+strlen(str)-strlen(suffix), suffix)); |
| } |
| |
| |
| void |
| startNewBroker ( brokerVector & brokers, |
| char const * moduleOrDir, |
| string const clusterName, |
| int verbosity, |
| int durable ) |
| { |
| static int brokerId = 0; |
| stringstream path, prefix; |
| prefix << "soak-" << brokerId; |
| std::vector<std::string> argv = list_of<string> |
| ("qpidd") |
| ("--cluster-name")(clusterName) |
| ("--auth=no") |
| ("--mgmt-enable=no") |
| ("--log-prefix")(prefix.str()) |
| ("--log-to-file")(prefix.str()+".log") |
| ("--log-enable=info+") |
| ("--log-enable=debug+:cluster") |
| ("TMP_DATA_DIR"); |
| |
| if (endsWith(moduleOrDir, "cluster.so")) { |
| // Module path specified, load only that module. |
| argv.push_back(string("--load-module=")+moduleOrDir); |
| argv.push_back("--no-module-dir"); |
| if ( durable ) { |
| std::cerr << "failover_soak warning: durable arg hass no effect. Use \"dir\" option of \"moduleOrDir\".\n"; |
| } |
| } |
| else { |
| // Module directory specified, load all modules in dir. |
| argv.push_back(string("--module-dir=")+moduleOrDir); |
| } |
| |
| newbie = new ForkedBroker (argv); |
| newbie_port = newbie->getPort(); |
| ForkedBroker * broker = newbie; |
| |
| if ( verbosity > 0 ) |
| std::cerr << "new broker created: pid == " |
| << broker->getPID() |
| << " log-prefix == " |
| << "soak-" << brokerId |
| << endl; |
| brokers.push_back ( broker ); |
| |
| ++ brokerId; |
| } |
| |
| |
| |
| |
| |
| bool |
| killFrontBroker ( brokerVector & brokers, int verbosity ) |
| { |
| cerr << "killFrontBroker: waiting for newbie sync...\n"; |
| if ( ! wait_for_newbie() ) |
| return false; |
| cerr << "killFrontBroker: newbie synced.\n"; |
| |
| if ( verbosity > 0 ) |
| cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; |
| try { brokers[0]->kill(9); } |
| catch ( const exception& error ) { |
| if ( verbosity > 0 ) |
| { |
| cout << "error killing broker: " |
| << error.what() |
| << endl; |
| } |
| |
| return false; |
| } |
| delete brokers[0]; |
| brokers.erase ( brokers.begin() ); |
| return true; |
| } |
| |
| |
| |
| |
| |
| /* |
| * The optional delay is to avoid killing newbie brokers that have just |
| * been added and are still in the process of updating. This causes |
| * spurious, test-generated errors that scare everybody. |
| */ |
| void |
| killAllBrokers ( brokerVector & brokers, int delay ) |
| { |
| if ( delay > 0 ) |
| { |
| std::cerr << "Killing all brokers after delay of " << delay << endl; |
| sleep ( delay ); |
| } |
| |
| for ( uint i = 0; i < brokers.size(); ++ i ) |
| try { brokers[i]->kill(9); } |
| catch ( const exception& error ) |
| { |
| std::cerr << "killAllBrokers Warning: exception during kill on broker " |
| << i |
| << " " |
| << error.what() |
| << endl; |
| } |
| } |
| |
| |
| |
| |
| |
| pid_t |
| runDeclareQueuesClient ( brokerVector brokers, |
| char const * host, |
| char const * path, |
| int verbosity, |
| int durable, |
| char const * queue_prefix, |
| int n_queues |
| ) |
| { |
| string name("declareQueues"); |
| int port = brokers[0]->getPort ( ); |
| |
| if ( verbosity > 1 ) |
| cout << "startDeclareQueuesClient: host: " |
| << host |
| << " port: " |
| << port |
| << endl; |
| stringstream portSs; |
| portSs << port; |
| |
| vector<const char*> argv; |
| argv.push_back ( "declareQueues" ); |
| argv.push_back ( host ); |
| string portStr = portSs.str(); |
| argv.push_back ( portStr.c_str() ); |
| if ( durable ) |
| argv.push_back ( "1" ); |
| else |
| argv.push_back ( "0" ); |
| |
| argv.push_back ( queue_prefix ); |
| |
| char n_queues_str[20]; |
| sprintf ( n_queues_str, "%d", n_queues ); |
| argv.push_back ( n_queues_str ); |
| |
| argv.push_back ( 0 ); |
| pid_t pid = fork(); |
| |
| if ( ! pid ) { |
| execv ( path, const_cast<char * const *>(&argv[0]) ); |
| perror ( "error executing declareQueues: " ); |
| return 0; |
| } |
| |
| allMyChildren.add ( name, pid, DECLARING_CLIENT ); |
| return pid; |
| } |
| |
| |
| |
| |
| |
| pid_t |
| startReceivingClient ( brokerVector brokers, |
| char const * host, |
| char const * receiverPath, |
| char const * reportFrequency, |
| int verbosity, |
| char const * queue_name |
| ) |
| { |
| string name("receiver"); |
| int port = brokers[0]->getPort ( ); |
| |
| if ( verbosity > 1 ) |
| cout << "startReceivingClient: port " << port << endl; |
| |
| // verbosity has to be > 1 to let clients talk. |
| int client_verbosity = (verbosity > 1 ) ? 1 : 0; |
| |
| char portStr[100]; |
| char verbosityStr[100]; |
| sprintf(portStr, "%d", port); |
| sprintf(verbosityStr, "%d", client_verbosity); |
| |
| |
| vector<const char*> argv; |
| argv.push_back ( "resumingReceiver" ); |
| argv.push_back ( host ); |
| argv.push_back ( portStr ); |
| argv.push_back ( reportFrequency ); |
| argv.push_back ( verbosityStr ); |
| argv.push_back ( queue_name ); |
| argv.push_back ( 0 ); |
| |
| pid_t pid = fork(); |
| pids.push_back ( pid ); |
| |
| if ( ! pid ) { |
| execv ( receiverPath, const_cast<char * const *>(&argv[0]) ); |
| perror ( "error executing receiver: " ); |
| return 0; |
| } |
| |
| allMyChildren.add ( name, pid, RECEIVING_CLIENT ); |
| return pid; |
| } |
| |
| |
| |
| |
| |
| pid_t |
| startSendingClient ( brokerVector brokers, |
| char const * host, |
| char const * senderPath, |
| char const * nMessages, |
| char const * reportFrequency, |
| int verbosity, |
| int durability, |
| char const * queue_name |
| ) |
| { |
| string name("sender"); |
| int port = brokers[0]->getPort ( ); |
| |
| if ( verbosity > 1) |
| cout << "startSenderClient: port " << port << endl; |
| char portStr[100]; |
| char verbosityStr[100]; |
| // |
| // verbosity has to be > 1 to let clients talk. |
| int client_verbosity = (verbosity > 1 ) ? 1 : 0; |
| |
| sprintf ( portStr, "%d", port); |
| sprintf ( verbosityStr, "%d", client_verbosity); |
| |
| vector<const char*> argv; |
| argv.push_back ( "replayingSender" ); |
| argv.push_back ( host ); |
| argv.push_back ( portStr ); |
| argv.push_back ( nMessages ); |
| argv.push_back ( reportFrequency ); |
| argv.push_back ( verbosityStr ); |
| if ( durability ) |
| argv.push_back ( "1" ); |
| else |
| argv.push_back ( "0" ); |
| argv.push_back ( queue_name ); |
| argv.push_back ( 0 ); |
| |
| pid_t pid = fork(); |
| pids.push_back ( pid ); |
| |
| if ( ! pid ) { |
| execv ( senderPath, const_cast<char * const *>(&argv[0]) ); |
| perror ( "error executing sender: " ); |
| return 0; |
| } |
| |
| allMyChildren.add ( name, pid, SENDING_CLIENT ); |
| return pid; |
| } |
| |
| |
| |
| #define HUNKY_DORY 0 |
| #define BAD_ARGS 1 |
| #define CANT_FORK_DQ 2 |
| #define CANT_FORK_RECEIVER 3 |
| #define CANT_FORK_SENDER 4 |
| #define DQ_FAILED 5 |
| #define ERROR_ON_CHILD 6 |
| #define HANGING 7 |
| #define ERROR_KILLING_BROKER 8 |
| |
| }} // namespace qpid::tests |
| |
| using namespace qpid::tests; |
| |
| // If you want durability, use the "dir" option of "moduleOrDir" . |
| int |
| main ( int argc, char const ** argv ) |
| { |
| int brokerKills = 0; |
| if ( argc != 11 ) { |
| cerr << "Usage: " |
| << argv[0] |
| << "moduleOrDir declareQueuesPath senderPath receiverPath nMessages reportFrequency verbosity durable n_queues n_brokers" |
| << endl; |
| cerr << "\tverbosity is an integer, durable is 0 or 1\n"; |
| return BAD_ARGS; |
| } |
| signal ( SIGCHLD, childExit ); |
| |
| int i = 1; |
| char const * moduleOrDir = argv[i++]; |
| char const * declareQueuesPath = argv[i++]; |
| char const * senderPath = argv[i++]; |
| char const * receiverPath = argv[i++]; |
| char const * nMessages = argv[i++]; |
| char const * reportFrequency = argv[i++]; |
| int verbosity = atoi(argv[i++]); |
| int durable = atoi(argv[i++]); |
| int n_queues = atoi(argv[i++]); |
| int n_brokers = atoi(argv[i++]); |
| |
| char const * host = "127.0.0.1"; |
| |
| allMyChildren.verbosity = verbosity; |
| |
| string clusterName; |
| |
| srand ( getpid() ); |
| |
| makeClusterName ( clusterName ); |
| |
| brokerVector brokers; |
| |
| if ( verbosity > 1 ) |
| cout << "Starting initial cluster...\n"; |
| |
| for ( int i = 0; i < n_brokers; ++ i ) { |
| startNewBroker ( brokers, |
| moduleOrDir, |
| clusterName, |
| verbosity, |
| durable ); |
| } |
| |
| |
| if ( verbosity > 0 ) |
| printBrokers ( brokers ); |
| |
| // Get prefix for each queue name. |
| stringstream queue_prefix; |
| queue_prefix << "failover_soak_" << getpid(); |
| string queue_prefix_str(queue_prefix.str()); |
| |
| // Run the declareQueues child. |
| int childStatus; |
| pid_t dqClientPid = |
| runDeclareQueuesClient ( brokers, |
| host, |
| declareQueuesPath, |
| verbosity, |
| durable, |
| queue_prefix_str.c_str(), |
| n_queues |
| ); |
| if ( -1 == dqClientPid ) { |
| cerr << "END_OF_TEST ERROR_START_DECLARE_1\n"; |
| return CANT_FORK_DQ; |
| } |
| |
| // Don't continue until declareQueues is finished. |
| pid_t retval = waitpid ( dqClientPid, & childStatus, 0); |
| if ( retval != dqClientPid) { |
| cerr << "END_OF_TEST ERROR_START_DECLARE_2\n"; |
| return DQ_FAILED; |
| } |
| allMyChildren.exited ( dqClientPid, childStatus ); |
| |
| |
| /* |
| Start one receiving and one sending client for each queue. |
| */ |
| for ( int i = 0; i < n_queues; ++ i ) { |
| |
| stringstream queue_name; |
| queue_name << queue_prefix.str() << '_' << i; |
| string queue_name_str(queue_name.str()); |
| |
| // Receiving client --------------------------- |
| pid_t receivingClientPid = |
| startReceivingClient ( brokers, |
| host, |
| receiverPath, |
| reportFrequency, |
| verbosity, |
| queue_name_str.c_str() ); |
| if ( -1 == receivingClientPid ) { |
| cerr << "END_OF_TEST ERROR_START_RECEIVER\n"; |
| return CANT_FORK_RECEIVER; |
| } |
| |
| |
| // Sending client --------------------------- |
| pid_t sendingClientPid = |
| startSendingClient ( brokers, |
| host, |
| senderPath, |
| nMessages, |
| reportFrequency, |
| verbosity, |
| durable, |
| queue_name_str.c_str() ); |
| if ( -1 == sendingClientPid ) { |
| cerr << "END_OF_TEST ERROR_START_SENDER\n"; |
| return CANT_FORK_SENDER; |
| } |
| } |
| |
| |
| int minSleep = 2, |
| maxSleep = 6; |
| |
| int totalBrokers = n_brokers; |
| |
| int loop = 0; |
| |
| while ( 1 ) |
| { |
| ++ loop; |
| |
| /* |
| if ( verbosity > 1 ) |
| std::cerr << "------- loop " << loop << " --------\n"; |
| |
| if ( verbosity > 0 ) |
| cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; |
| */ |
| |
| // Sleep for a while. ------------------------- |
| int sleepyTime = mrand ( minSleep, maxSleep ); |
| sleep ( sleepyTime ); |
| |
| int bullet = mrand ( 100 ); |
| if ( bullet >= 95 ) |
| { |
| fprintf ( stderr, "Killing oldest broker...\n" ); |
| |
| // Kill the oldest broker. -------------------------- |
| if ( ! killFrontBroker ( brokers, verbosity ) ) |
| { |
| allMyChildren.killEverybody(); |
| killAllBrokers ( brokers, 5 ); |
| std::cerr << "END_OF_TEST ERROR_BROKER\n"; |
| return ERROR_KILLING_BROKER; |
| } |
| ++ brokerKills; |
| |
| // Start a new broker. -------------------------- |
| if ( verbosity > 0 ) |
| cout << "Starting new broker.\n\n"; |
| |
| startNewBroker ( brokers, |
| moduleOrDir, |
| clusterName, |
| verbosity, |
| durable ); |
| ++ totalBrokers; |
| printBrokers ( brokers ); |
| cerr << brokerKills << " brokers have been killed.\n\n\n"; |
| } |
| |
| int retval = allMyChildren.checkChildren(); |
| if ( retval ) |
| { |
| std::cerr << "END_OF_TEST ERROR_CLIENT\n"; |
| allMyChildren.killEverybody(); |
| killAllBrokers ( brokers, 5 ); |
| return ERROR_ON_CHILD; |
| } |
| |
| // If all children have exited, quit. |
| int unfinished = allMyChildren.unfinished(); |
| if ( unfinished == 0 ) { |
| killAllBrokers ( brokers, 5 ); |
| |
| if ( verbosity > 1 ) |
| cout << "failoverSoak: all children have exited.\n"; |
| |
| std::cerr << "END_OF_TEST SUCCESSFUL\n"; |
| return HUNKY_DORY; |
| } |
| |
| } |
| |
| allMyChildren.killEverybody(); |
| killAllBrokers ( brokers, 5 ); |
| |
| std::cerr << "END_OF_TEST SUCCESSFUL\n"; |
| |
| return HUNKY_DORY; |
| } |
| |
| |
| |