| #!/usr/bin/env bash |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| # Test script for validating the behaviour of a ring queue |
| |
| QUEUE_NAME=ring-queue |
| LIMIT=100 |
| DURABLE=0 |
| MESSAGES=10000 |
| SENDERS=1 |
| RECEIVERS=1 |
| CONCURRENT=0 |
| BROKER_URL="-b ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" |
| |
| setup() { |
| if [[ $DURABLE -gt 0 ]]; then |
| EXTRA_ARGS=" --durable" |
| fi |
| qpid-config $BROKER_URL add queue $QUEUE_NAME --max-queue-count $LIMIT --limit-policy ring $EXTRA_ARGS |
| } |
| |
| send() { |
| datagen --count $MESSAGES | tee sender_${QUEUE_NAME}_${1} | sender --durable $DURABLE --routing-key $QUEUE_NAME |
| } |
| |
| receive() { |
| #TODO: allow a variety of receiver options to be passed in (ack-frequency, credit-window etc) |
| receiver --queue $QUEUE_NAME > receiver_${QUEUE_NAME}_${1} |
| } |
| |
| cleanup() { |
| rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_* |
| qpid-config $BROKER_URL del queue $QUEUE_NAME --force |
| } |
| |
| log() { |
| echo $1 |
| } |
| |
| fail() { |
| echo $1 |
| FAILED=1 |
| } |
| |
| validate() { |
| if [[ $RECEIVERS -eq 0 ]]; then |
| #queue should have $LIMIT messages on it, but need to send an eos also |
| sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null |
| received=$(receiver --queue $QUEUE_NAME --browse | wc -l) |
| if [[ received -eq $(( $LIMIT - 1)) ]]; then |
| log "queue contains $LIMIT messages as expected" |
| else |
| fail "queue does not contain the expected $LIMIT messages (received $received)" |
| fi |
| elif [[ $CONCURRENT -eq 0 ]]; then |
| #sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each) |
| received=$(cat receiver_${QUEUE_NAME}_* | wc -l) |
| expected=$(( $LIMIT - $RECEIVERS )) |
| if [[ $received -eq $expected ]]; then |
| log "received $LIMIT messages as expected" |
| else |
| fail "received $received messages, expected $expected" |
| fi |
| #if there were only a single sender and receiver (executed serially) we can check the |
| #actual received contents |
| if [[ $RECEIVERS -eq 1 ]] && [[ $SENDERS -eq 1 ]]; then |
| tail -n $(($LIMIT - 1)) sender_${QUEUE_NAME}_1 | diff - receiver_${QUEUE_NAME}_1 || FAILED=1 |
| if [[ $FAILED -eq 1 ]]; then |
| fail "did not receive expected messages" |
| else |
| log "received messages matched expectations" |
| fi |
| fi |
| else |
| #multiple receivers, concurrent with senders; ring queue functionality cannot be validated in this case |
| if [[ $(cat receiver_${QUEUE_NAME}_* | wc -l) -le $(cat sender_${QUEUE_NAME}_* | wc -l) ]]; then |
| log "sent at least as many messages as were received" |
| else |
| #Note: if any receiver was browsing, this would be valid (would need to add 'sort | uniq') |
| # to pipeline above |
| fail "received more messages than were sent" |
| fi |
| fi |
| |
| if [[ $FAILED ]]; then |
| echo $(basename $0): FAILED |
| exit 1 |
| else |
| cleanup |
| fi |
| } |
| |
| run_test() { |
| if [[ $CONCURRENT -eq 0 ]]; then |
| echo "Starting $SENDERS senders followed by $RECEIVERS receivers " |
| else |
| echo "Starting $SENDERS senders concurrently with $RECEIVERS receivers" |
| fi |
| for ((i=1; i <= $SENDERS; i++)); do |
| send $i & |
| sender_pids[$i]=$! |
| done |
| if [[ $CONCURRENT -eq 0 ]] && [[ $RECEIVERS -gt 0 ]]; then |
| wait |
| sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null |
| fi |
| for ((i=1; i <= $RECEIVERS; i++)); do |
| receive $i & |
| done |
| if [[ $CONCURRENT -gt 0 ]]; then |
| for ((i=1; i <= $SENDERS; i++)); do |
| wait ${sender_pids[$i]} |
| done |
| sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null |
| fi |
| wait |
| } |
| |
| usage() { |
| cat <<EOF |
| $(basename $0): Test script for validating the behaviour of a ring queue |
| |
| Options: |
| -q <queue> the name of the queue to use |
| -s <senders> the number of senders to start |
| -r <receivers> the number of receivers to start |
| -l <limit> the limit for the ring queue |
| -m <messages> the number of messages to send |
| -c if specified, receivers will run concurrently with senders |
| -d if specified the queue and messages will be durable |
| EOF |
| exit 1 |
| } |
| |
| while getopts "s:r:m:u:dch" opt ; do |
| case $opt in |
| q) QUEUE_NAME=$OPTARG ;; |
| l) LIMIT=$OPTARG ;; |
| s) SENDERS=$OPTARG ;; |
| r) RECEIVERS=$OPTARG ;; |
| m) MESSAGES=$OPTARG ;; |
| d) DURABLE=1 ;; |
| c) CONCURRENT=1 ;; |
| h) usage;; |
| ?) usage;; |
| esac |
| done |
| |
| if [[ $SENDERS -gt 0 ]]; then |
| setup |
| run_test |
| validate |
| else |
| echo "Nothing can be done if there are no senders" |
| fi |
| |