<?xml version="1.0"?>
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
<mx:Application xmlns:mx="" creationComplete="creationCompleteHandler()">
<mx:Panel id="mainPanel" height="100%" width="100%">
<mx:Form id="inputForm" defaultButton="{btnStart}" >
<mx:Label text="Consumer" fontWeight="bold"/>
<mx:FormItem label="Specify destination to send messages to:" >
<mx:ComboBox id="destinationCB" change="setupChannelCB()" width="300" selectedIndex="0" />
<mx:FormItem label="Specify channel/end point to communicate over:" >
<mx:ComboBox id="channelsCB" selectedIndex="0" width="150" />
<mx:FormItem label="Filter destination on key words:" >
<mx:ComboBox id="destinationCriteria" dataProvider="['Error', 'Ignore', 'In', 'None', 'Out']"
change="applyDestinationCriteria()" width="150" selectedIndex="0"/>
<mx:FormItem label="Override waitForPollInterval.">
<mx:ComboBox width="50" id="increaseWaitForPollIntervalCB" dataProvider="[0,1,2,3,4,5,6,7,8,9]" />
<mx:FormItem label="Total Message Count:" >
<mx:Label id="msgCounts" text="{globalMessages.toString()}" width="100" textAlign="right"/>
<mx:FormItem label="Current Message Rate:" visible="{con.subscribed}">
<mx:Label id="msgCountPerPass" text="{Math.round(currentRate).toString()}" width="100" textAlign="right"/>
<mx:FormItem label="Global Average Rate:" >
<mx:Label id="msgAverage" text="{Math.round(globalMessageAverage).toString()}" width="100" textAlign="right"/>
<mx:Label id="elapsedTimeSinceFirstReceiptInSeconds" width="300" height="100" textAlign="left"/>
<mx:Button id="btnStart" label="Start" click="startTest()" enabled="{!con.subscribed}" />
<mx:Button id="btnStop" label="Stop" click="stopTest()" enabled="true" />
<mx:Button id="btnPoll" label="Begin Manual Polling" click="poll()" enabled="false" />
<mx:Button id="btnMsgArrayCountDisplay" label="Dislpay Message Count 'Per Second' Increments" click="displayCountsPerSecondIncrements(msgEventCountArray);"/>
import mx.messaging.channels.AMFChannel;
import flash.utils.getTimer;
import mx.controls.Alert;
import mx.collections.SortField;
import mx.collections.Sort;
import mx.collections.XMLListCollection;
import mx.messaging.messages.*;
import mx.messaging.config.*;
import mx.messaging.channels.PollingChannel;
import mx.messaging.Channel;
import mx.messaging.ChannelSet;
import mx.collections.ArrayCollection;
import mx.messaging.Consumer;
import mx.utils.ObjectUtil;
import flash.utils.Timer;
private var testRunTimer:Timer;
private var measurementTimer:Timer;
private var receiveAck:Boolean = false;
private var noConFault:Boolean = true;
private var conFaultString:String = "";
private var chanFaultString:String = "";
private var result:String = "";
private var con:Consumer;
private var channelSet:ChannelSet = new ChannelSet();
private var expected:ArrayCollection;
private var ac:ArrayCollection = new ArrayCollection();
private var failureCount:int = 0;
private var xml:XML = ServerConfig.serverConfigData;
private var sortDestinations:Sort = new Sort();
private var sortChannels:Sort = new Sort();
private var dest:String;
private var channel:Channel;
private var isFirstMessage:Boolean = true;
private var isStopTest:Boolean = true;
private var isNotStartTest:Boolean = true;
private var msgEventCountArray:Array = new Array();
private var finalMessageCountArray:Array = new Array();
private var elapsedTimeSinceFirstMessageReceivedSeconds:Number;
private var counter:int = 0;
private var stopWatch:int = 0;
private var pollWatch:int = 0;
private var isManualPolling:Boolean = false;
// Used to keep track of global message receive rate since subscription.
public var globalRate:Number = 0;
private var globalStartTime:int;
private var globalMessages:Number = 0;
private var globalMessageAverage:Number = 0;
// Used to keep track of message receive rate in the last second.
private var currentRate:Number = 0;
private var startTime:int = 0;
private var currentMessages:Number = 0;
private var waitForPollInterval:int = 0;
// How often to measure the message rate.
public const MEASUREMENT_TIME_MILLIS:int = 1000;
* XMLListCollection to hold names of destinations that will be tested
private var destinations:XMLListCollection;
* ArrayCollection to hold names of channels that will be tested
public var channelNames:ArrayCollection = new ArrayCollection();
public function creationCompleteHandler():void
//use this timer to stop test
testRunTimer = new Timer(MEASUREMENT_TIME_MILLIS,0);
testRunTimer.addEventListener(TimerEvent.TIMER, testRunTimerHandler);
measurementTimer = new Timer(MEASUREMENT_TIME_MILLIS,0);
measurementTimer.addEventListener(TimerEvent.TIMER, measurementTimerHandler);
//retrieve list of destinations
destinations = new XMLListCollection(new XMLList(xml.service.(@id == "message-service").destination.attribute("id")));
destinationCB.dataProvider = destinations;
//filter and sort destinations
destinations.filterFunction = destinationsFilter;
sortDestinations.fields = [new SortField(null, true)];
destinations.sort = sortDestinations;
//initialize Consumer
con = new Consumer();
//add all event listener methods
con.addEventListener(MessageEvent.MESSAGE, msMessageHandler);
con.addEventListener(MessageFaultEvent.FAULT, conFault);
con.addEventListener(ChannelEvent.DISCONNECT, chanDisconnect);
con.addEventListener(ChannelFaultEvent.FAULT, chanFault);
//force first refresh on destination combo
//complete setup
private function setupChannelCB():void
dest = destinationCB.selectedItem.toString();
con.destination = dest;
//get the list of channel names for the destination; use as dataProvider for
channelNames = new ArrayCollection((ServerConfig.getChannelSet(dest) as ChannelSet).channelIds);
//sort channels
sortChannels.fields = [new SortField(null, true)];
channelNames.sort = sortChannels;
channelsCB.dataProvider = channelNames;
channelsCB.enabled = true;
destinationCriteria.enabled = true;
private function setupChannel():void
//reset channel and channelSet
if(channelSet != null)
channelSet = null;
channel = null;
channelSet = new ChannelSet();
channel = ServerConfig.getChannel(channelsCB.selectedItem.toString());
private function _waitForPollInterval(consumer:Consumer):int
* The 'waitForPollInterval' variable is a number that we use to control how long we want the Consumer to continue, in seconds, to remain in a run state
* once the MessageEvent.MESSAGE event stops firing. This is to compensate for clients or consumers connected over polling channels that may be in
* between polls. For example, when a polling-interval is 3000 millis we want to gaurantee that we do not stop the test prematurely or before the
* last poll intervel is reached. The test run time is kept track of using the counter variable. The counter is increased by 1 each time the
* testRunTimer TimerEvent.TIMER event listener is fired (every 1000 millis). The counter is reset to 0 each time the MessageEvent.MESSAGE event
* is fired indicating that we're still recieving messages. So, if the counter is > 1 that means no more messages are coming at the moment.
* For persistent connections such as RTMP and AMFStreaming as well as channels configured as long-polling channels this may be true but not
* for polling channels. There is code below to determine what the best waitForPollInterval is by default based on channel type but once
* there are more Clients connected to the destination we may need to override those settings and this method enables that.
var selectedItem:int = increaseWaitForPollIntervalCB.selectedItem as int;
var _waitForPollInterval:int;
if(channelType(consumer) == "AMFChannel")
if(selectedItem >0)
if(channelProps(consumer,"pollingEnabled") && channelProps(consumer,"pollingInterval", 3000))
_waitForPollInterval = selectedItem;
trace("polling is enabled at channel level selectedItem >0 " + _waitForPollInterval.toString());
}//polling not enabled for this channel
else if (!channelProps(consumer,"pollingEnabled"))
//enable manual polling
isManualPolling = true;
_waitForPollInterval = selectedItem;
trace("enabling manual polling: selectedItem >0" + _waitForPollInterval.toString());
if(channelProps(consumer,"pollingEnabled") && channelProps(consumer,"pollingInterval", 3000))
{trace("channelType(consumer) == AMFChannel");
_waitForPollInterval = 3;
trace("polling is enabled at channel level selectedItem = 0" + _waitForPollInterval.toString());
}//polling not enabled for this channel
else if(!channelProps(consumer,"pollingEnabled"))
//enable manual polling
isManualPolling = true;
_waitForPollInterval = 3;
trace("enabling manual polling: selectedItem = 0" + _waitForPollInterval.toString());
_waitForPollInterval = 3;
return _waitForPollInterval;
private function channelProps(consumer:Consumer, prop:String, interval:int=0):Boolean
if(prop == "pollingEnabled")
if((consumer.channelSet.currentChannel as AMFChannel).pollingEnabled)
return true;
return false;
else if (prop == "pollingInterval")
if((consumer.channelSet.currentChannel as AMFChannel).pollingInterval < interval)
return true;
return false;
return false;
private function channelType(consumer:Consumer):String
if(consumer.channelSet.currentChannel is AMFChannel)
return "AMFChannel";
return "unknown";
private function consumerSubscribe():void
con.channelSet = channelSet;
waitForPollInterval = _waitForPollInterval(con);
pollWatch = waitForPollInterval;
//enable this button only when manual polling is true
btnPoll.enabled = isManualPolling;
private function consumerUnsubscribe():void
private function startTest():void
//reset collections and other values
msgEventCountArray = null;
msgEventCountArray = new Array();
finalMessageCountArray = null;
finalMessageCountArray = new Array();
elapsedTimeSinceFirstReceiptInSeconds.text = "";
globalStartTime = getTimer();
globalMessages = 0;
currentMessages = 0;
startTime = 0;
private function stopTest():void
//reset vars, stop timers, unsubscribe and compute final results
trace("stop test called");
globalMessageAverage = calcMessageReceiptAverage(msgEventCountArray);
elapsedTimeSinceFirstReceiptInSeconds.text = "";
stopWatch = 0;
globalRate = 0;
currentRate = 0;
channel = null;
counter = 0;
pollWatch = 0;
elapsedTimeSinceFirstMessageReceivedSeconds = 0;
isNotStartTest = false;
isStopTest = false;
failureCount = 0;
//reset this when stop event occurs
isFirstMessage = true;
btnPoll.enabled = false;
btnPoll.label = 'Begin Manual Polling';
public function msMessageHandler(e:MessageEvent):void
var now:int = getTimer();
globalRate = 1000.0 * ++globalMessages / Number(now - globalStartTime);
//push message count
if (startTime == 0)
startTime = now;
if (now - startTime > MEASUREMENT_TIME_MILLIS)
currentRate = 1000.0 * currentMessages / Number(now - startTime);
//trace("msgEventCountArray " + msgEventCountArray[msgEventCountArray.length -1].toString());
currentMessages = 0;
startTime = now;
//if we're receiving messages then the test is not over
isStopTest = false;
//As long as we're recieving messages keep this set to 0
counter = 0;
//now that we got first message since successfully subscribing set this to false indicating so
isFirstMessage = false;
public function testRunTimerHandler(e:TimerEvent):void
if(isNotStartTest)//First time so reset bit, capture start time
isNotStartTest = false;
globalStartTime = getTimer();
//If no longer receiving messages from producer (desitnation may be emptying queue if/when buffered) then stop test
if(isStopTest && !isFirstMessage && counter > waitForPollInterval)
trace(" waitForP: " + waitForPollInterval.toString());
//Set to true each time throught. This will be reset to false in msMessageHandler
//meaning that we're still receiving msgs. If this value remains true, meaning that it was not reset back to false
//in the msMessageHandler then the test is over
isStopTest = true;
//increment counter variable by 1
public function measurementTimerHandler(e:TimerEvent):void
if(!isFirstMessage)//First time begin keeping track of elapsed time
if(pollWatch > 0)
elapsedTimeSinceFirstReceiptInSeconds.text = "\nReceived " + finalMessageCountArray[finalMessageCountArray.length -1].toString()
+ " in " + stopWatch.toString() + " seconds. \nUsing wait measure of " + waitForPollInterval.toString()
+ " poll again in " + (--pollWatch).toString() + " seconds";
trace("measurementTimer************************* " + pollWatch.toString());
pollWatch = waitForPollInterval;
elapsedTimeSinceFirstReceiptInSeconds.text = "\nReceived " + finalMessageCountArray[finalMessageCountArray.length -1].toString()
+ " in " + stopWatch.toString() + " seconds. \nUsing wait measure of " + waitForPollInterval.toString();
globalMessageAverage = calcMessageReceiptAverage(msgEventCountArray);
private function poll():void
if (!con.subscribed)
var c:Channel = con.channelSet.currentChannel;
if (c is PollingChannel)
(c as PollingChannel).poll();
//pollWatch = waitForPollInterval;
btnPoll.label = 'Polling';
public function displayCountsPerSecondIncrements(a:Array):void
public function conFault(e:MessageFaultEvent):void
noConFault = false;
conFaultString = e.faultString;
public function chanFault(e:ChannelFaultEvent):void
noConFault = false;
chanFaultString = e.faultString;
//trace(e.faultString + " : " + ObjectUtil.toString(e));
trace(" : " + e.faultString + " : " + e.faultDetail);
private function consumerAck(event:MessageAckEvent):void
//trace("event : " + ObjectUtil.toString(event.correlation.headers));
if((event.correlation is CommandMessage) && (CommandMessage(event.correlation).operation == CommandMessage.SUBSCRIBE_OPERATION))
trace("MessageAckEvent: successfully subscribed!");
} else {
trace("event.correlation : " + ObjectUtil.toString(event.correlation.headers));
private function chanDisconnect(event:ChannelEvent):void
//trace("event : " + ObjectUtil.toString(event.correlation.headers));
trace("ChannelEvent: connected");
else if (!event.connected)
trace("ChannelEvent: disconnect");
trace("ChannelEvent : " + ObjectUtil.toString(event));
public function consumerPropertyChangeEventHandler(pe:PropertyChangeEvent):void
//trace("property changed: " + ObjectUtil.toString(;
public function destinationsFilter(obj:Object):Boolean
var retval:Object;
// The search method returns an int (staring position of pattern within the String ) when true and -1 when not.
// So this looks backwards
if(String(ObjectUtil.toString(obj)).search(destinationCriteria.selectedItem.toString()) != -1)
retval = obj;
return (retval);
private function applyDestinationCriteria():void
//filter and sort destinations and channels
destinationCB.selectedIndex = 0;
public function calcMessageReceiptAverage(arg:Array):Number
//trace("arg: " + ObjectUtil.toString(arg));
var result:Number = 0;
var iteration:Number = 0;
//add up all array elements
for each(var item:Number in arg)
result = result + item;
if(item !=0)
//trace("result loop: " + result);
trace("result/iteration: " + (result/iteration).toString());
return result/iteration;