| #!/bin/bash |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| # Test reliability of the replication feature in the face of link |
| # failures: |
| srcdir=`dirname $0` |
| source ./test_env.sh |
| |
| trap stop_brokers EXIT |
| |
| fail() { |
| echo $1 |
| exit 1 |
| } |
| |
| stop_brokers() { |
| if [[ $BROKER_A ]] ; then |
| ../qpidd --no-module-dir -q --port $BROKER_A |
| unset BROKER_A |
| fi |
| if [[ $NODE_1 ]] ; then |
| ../qpidd --no-module-dir -q --port $NODE_1 |
| unset NODE_1 |
| fi |
| if [[ $NODE_2 ]] ; then |
| ../qpidd --no-module-dir -q --port $NODE_2 |
| unset NODE_2 |
| fi |
| if [ -f cluster.ports ]; then |
| rm cluster.ports |
| fi |
| } |
| |
| start_brokers() { |
| #start single node... |
| BROKER_A=`../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --log-enable info+` || fail "BROKER_A failed to start" |
| |
| #...and start cluster |
| $srcdir/start_cluster 2 || fail "Could not start cluster" |
| NODE_1=$(head -1 cluster.ports) |
| NODE_2=$(tail -1 cluster.ports) |
| test -n "$NODE_1" || fail "NODE_1 failed to start" |
| test -n "$NODE_2" || fail "NODE_2 failed to start" |
| } |
| |
| setup() { |
| #create exchange on both cluster and single broker |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add exchange direct test-exchange |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add exchange direct test-exchange |
| |
| #create dynamic routes for test exchange |
| $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$NODE_2" "localhost:$BROKER_A" test-exchange |
| $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$BROKER_A" "localhost:$NODE_2" test-exchange |
| |
| #create test queue on cluster and bind it to the test exchange |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add queue test-queue |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" bind test-exchange test-queue to-cluster |
| |
| #create test queue on single broker and bind it to the test exchange |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue test-queue |
| $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" bind test-exchange test-queue from-cluster |
| } |
| |
| run_test_pull_to_cluster_two_consumers() { |
| #start consumers on each of the two nodes of the cluster |
| ./receiver --port $NODE_1 --queue test-queue --credit-window 1 > fed1.out.tmp & |
| ./receiver --port $NODE_2 --queue test-queue --credit-window 1 > fed2.out.tmp & |
| |
| #send stream of messages to test exchange on single broker |
| for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done |
| ./sender --port $BROKER_A --exchange test-exchange --routing-key to-cluster --send-eos 2 < fed.in.tmp |
| |
| #combine output of the two consumers, sort it and compare with the expected stream |
| wait |
| sort -g -k 2 fed1.out.tmp fed2.out.tmp > fed.out.tmp |
| diff fed.in.tmp fed.out.tmp || fail "federated link to cluster failed: expectations not met!" |
| |
| rm -f fed*.tmp #cleanup |
| } |
| |
| run_test_pull_to_cluster() { |
| #send stream of messages to test exchange on single broker |
| for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done |
| ./sender --port $BROKER_A --exchange test-exchange --routing-key to-cluster --send-eos 1 < fed.in.tmp |
| |
| #consume from remaining node of the cluster |
| ./receiver --port $NODE_2 --queue test-queue > fed.out.tmp |
| |
| #verify all messages are received |
| diff fed.in.tmp fed.out.tmp || fail "federated link to cluster failed: expectations not met!" |
| |
| rm -f fed*.tmp #cleanup |
| } |
| |
| run_test_pull_from_cluster() { |
| #start consumer on single broker |
| ./receiver --port $BROKER_A --queue test-queue --credit-window 1 > fed.out.tmp & |
| |
| #send stream of messages to test exchange on cluster |
| for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done |
| ./sender --port $NODE_2 --exchange test-exchange --routing-key from-cluster --send-eos 1 < fed.in.tmp |
| |
| #verify all messages are received |
| wait |
| diff fed.in.tmp fed.out.tmp || fail "federated link from cluster failed: expectations not met!" |
| |
| rm -f fed*.tmp #cleanup |
| } |
| |
| |
| if test -d ${PYTHON_DIR}; then |
| . cpg_check.sh |
| cpg_enabled || exit 0 |
| |
| rm -f fed*.tmp #cleanup any files left from previous run |
| start_brokers |
| echo "brokers started" |
| setup |
| echo "setup completed" |
| run_test_pull_to_cluster_two_consumers |
| echo "federated link to cluster verified" |
| run_test_pull_from_cluster |
| echo "federated link from cluster verified" |
| if [[ $TEST_NODE_FAILURE ]] ; then |
| #kill first cluster node and retest |
| kill -9 $(../qpidd --check --port $NODE_1) && unset NODE_1 |
| echo "killed first cluster node; waiting for links to re-establish themselves..." |
| sleep 5 |
| echo "retesting..." |
| run_test_pull_to_cluster |
| echo "federated link to cluster verified" |
| run_test_pull_from_cluster |
| echo "federated link from cluster verified" |
| fi |
| fi |