blob: 410ea30eb8c216d1b4634655a2eb770579245fab [file] [log] [blame]
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Test script for validating the behaviour of a ring queue
QUEUE_NAME=ring-queue
LIMIT=100
DURABLE=0
MESSAGES=10000
SENDERS=1
RECEIVERS=1
CONCURRENT=0
BROKER_URL="-b ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
setup() {
if [[ $DURABLE -gt 0 ]]; then
EXTRA_ARGS=" --durable"
fi
qpid-config $BROKER_URL add queue $QUEUE_NAME --max-queue-count $LIMIT --limit-policy ring $EXTRA_ARGS
}
send() {
datagen --count $MESSAGES | tee sender_${QUEUE_NAME}_${1} | sender --durable $DURABLE --routing-key $QUEUE_NAME
}
receive() {
#TODO: allow a variety of receiver options to be passed in (ack-frequency, credit-window etc)
receiver --queue $QUEUE_NAME > receiver_${QUEUE_NAME}_${1}
}
cleanup() {
rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_*
qpid-config $BROKER_URL del queue $QUEUE_NAME --force
}
log() {
echo $1
}
fail() {
echo $1
FAILED=1
}
validate() {
if [[ $RECEIVERS -eq 0 ]]; then
#queue should have $LIMIT messages on it, but need to send an eos also
sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null
received=$(receiver --queue $QUEUE_NAME --browse | wc -l)
if [[ received -eq $(( $LIMIT - 1)) ]]; then
log "queue contains $LIMIT messages as expected"
else
fail "queue does not contain the expected $LIMIT messages (received $received)"
fi
elif [[ $CONCURRENT -eq 0 ]]; then
#sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each)
received=$(cat receiver_${QUEUE_NAME}_* | wc -l)
expected=$(( $LIMIT - $RECEIVERS ))
if [[ $received -eq $expected ]]; then
log "received $LIMIT messages as expected"
else
fail "received $received messages, expected $expected"
fi
#if there were only a single sender and receiver (executed serially) we can check the
#actual received contents
if [[ $RECEIVERS -eq 1 ]] && [[ $SENDERS -eq 1 ]]; then
tail -n $(($LIMIT - 1)) sender_${QUEUE_NAME}_1 | diff - receiver_${QUEUE_NAME}_1 || FAILED=1
if [[ $FAILED -eq 1 ]]; then
fail "did not receive expected messages"
else
log "received messages matched expectations"
fi
fi
else
#multiple receivers, concurrent with senders; ring queue functionality cannot be validated in this case
if [[ $(cat receiver_${QUEUE_NAME}_* | wc -l) -le $(cat sender_${QUEUE_NAME}_* | wc -l) ]]; then
log "sent at least as many messages as were received"
else
#Note: if any receiver was browsing, this would be valid (would need to add 'sort | uniq')
# to pipeline above
fail "received more messages than were sent"
fi
fi
if [[ $FAILED ]]; then
echo $(basename $0): FAILED
exit 1
else
cleanup
fi
}
run_test() {
if [[ $CONCURRENT -eq 0 ]]; then
echo "Starting $SENDERS senders followed by $RECEIVERS receivers "
else
echo "Starting $SENDERS senders concurrently with $RECEIVERS receivers"
fi
for ((i=1; i <= $SENDERS; i++)); do
send $i &
sender_pids[$i]=$!
done
if [[ $CONCURRENT -eq 0 ]] && [[ $RECEIVERS -gt 0 ]]; then
wait
sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null
fi
for ((i=1; i <= $RECEIVERS; i++)); do
receive $i &
done
if [[ $CONCURRENT -gt 0 ]]; then
for ((i=1; i <= $SENDERS; i++)); do
wait ${sender_pids[$i]}
done
sender --routing-key $QUEUE_NAME --send-eos $RECEIVERS < /dev/null
fi
wait
}
usage() {
cat <<EOF
$(basename $0): Test script for validating the behaviour of a ring queue
Options:
-q <queue> the name of the queue to use
-s <senders> the number of senders to start
-r <receivers> the number of receivers to start
-l <limit> the limit for the ring queue
-m <messages> the number of messages to send
-c if specified, receivers will run concurrently with senders
-d if specified the queue and messages will be durable
EOF
exit 1
}
while getopts "s:r:m:u:dch" opt ; do
case $opt in
q) QUEUE_NAME=$OPTARG ;;
l) LIMIT=$OPTARG ;;
s) SENDERS=$OPTARG ;;
r) RECEIVERS=$OPTARG ;;
m) MESSAGES=$OPTARG ;;
d) DURABLE=1 ;;
c) CONCURRENT=1 ;;
h) usage;;
?) usage;;
esac
done
if [[ $SENDERS -gt 0 ]]; then
setup
run_test
validate
else
echo "Nothing can be done if there are no senders"
fi