<?xml version="1.0"?> | |
<!-- | |
Licensed to the Apache Software Foundation (ASF) under one or more | |
contributor license agreements. See the NOTICE file distributed with | |
this work for additional information regarding copyright ownership. | |
The ASF licenses this file to You under the Apache License, Version 2.0 | |
(the "License"); you may not use this file except in compliance with | |
the License. You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
--> | |
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" creationComplete="creationCompleteHandler()"> | |
<mx:Panel id="mainPanel" height="100%" width="100%"> | |
<mx:Form id="inputForm" defaultButton="{btnStart}" > | |
<mx:VBox> | |
<mx:Label text="Consumer" fontWeight="bold"/> | |
<mx:FormItem label="Specify destination to send messages to:" > | |
<mx:ComboBox id="destinationCB" change="setupChannelCB()" width="300" selectedIndex="0" /> | |
</mx:FormItem> | |
<mx:FormItem label="Specify channel/end point to communicate over:" > | |
<mx:ComboBox id="channelsCB" selectedIndex="0" width="150" /> | |
</mx:FormItem> | |
<mx:FormItem label="Filter destination on key words:" > | |
<mx:ComboBox id="destinationCriteria" dataProvider="['Error', 'Ignore', 'In', 'None', 'Out']" | |
change="applyDestinationCriteria()" width="150" selectedIndex="0"/> | |
</mx:FormItem> | |
<mx:FormItem label="Override waitForPollInterval."> | |
<mx:ComboBox width="50" id="increaseWaitForPollIntervalCB" dataProvider="[0,1,2,3,4,5,6,7,8,9]" /> | |
</mx:FormItem> | |
<mx:FormItem label="Total Message Count:" > | |
<mx:Label id="msgCounts" text="{globalMessages.toString()}" width="100" textAlign="right"/> | |
</mx:FormItem> | |
<mx:FormItem label="Current Message Rate:" visible="{con.subscribed}"> | |
<mx:Label id="msgCountPerPass" text="{Math.round(currentRate).toString()}" width="100" textAlign="right"/> | |
</mx:FormItem> | |
<mx:FormItem label="Global Average Rate:" > | |
<mx:Label id="msgAverage" text="{Math.round(globalMessageAverage).toString()}" width="100" textAlign="right"/> | |
</mx:FormItem> | |
<mx:FormItem> | |
<mx:Label id="elapsedTimeSinceFirstReceiptInSeconds" width="300" height="100" textAlign="left"/> | |
</mx:FormItem> | |
<mx:HBox> | |
<mx:Button id="btnStart" label="Start" click="startTest()" enabled="{!con.subscribed}" /> | |
<mx:Button id="btnStop" label="Stop" click="stopTest()" enabled="true" /> | |
<mx:Button id="btnPoll" label="Begin Manual Polling" click="poll()" enabled="false" /> | |
<mx:Button id="btnMsgArrayCountDisplay" label="Dislpay Message Count 'Per Second' Increments" click="displayCountsPerSecondIncrements(msgEventCountArray);"/> | |
</mx:HBox> | |
</mx:VBox> | |
</mx:Form> | |
</mx:Panel> | |
<mx:Script> | |
<![CDATA[ | |
import mx.messaging.channels.AMFChannel; | |
import mx.events.PropertyChangeEvent; | |
import flash.utils.getTimer; | |
import mx.controls.Alert; | |
import mx.collections.SortField; | |
import mx.collections.Sort; | |
import mx.collections.XMLListCollection; | |
import mx.messaging.messages.*; | |
import mx.messaging.config.*; | |
import mx.messaging.events.*; | |
import mx.messaging.channels.PollingChannel; | |
import mx.messaging.Channel; | |
import mx.messaging.ChannelSet; | |
import mx.collections.ArrayCollection; | |
import mx.messaging.Consumer; | |
import mx.utils.ObjectUtil; | |
import flash.utils.Timer; | |
private var testRunTimer:Timer; | |
private var measurementTimer:Timer; | |
private var receiveAck:Boolean = false; | |
private var noConFault:Boolean = true; | |
private var conFaultString:String = ""; | |
private var chanFaultString:String = ""; | |
private var result:String = ""; | |
[Bindable] | |
private var con:Consumer; | |
private var channelSet:ChannelSet = new ChannelSet(); | |
private var expected:ArrayCollection; | |
private var ac:ArrayCollection = new ArrayCollection(); | |
private var failureCount:int = 0; | |
private var xml:XML = ServerConfig.serverConfigData; | |
private var sortDestinations:Sort = new Sort(); | |
private var sortChannels:Sort = new Sort(); | |
private var dest:String; | |
private var channel:Channel; | |
private var isFirstMessage:Boolean = true; | |
private var isStopTest:Boolean = true; | |
private var isNotStartTest:Boolean = true; | |
[Bindable] | |
private var msgEventCountArray:Array = new Array(); | |
private var finalMessageCountArray:Array = new Array(); | |
private var elapsedTimeSinceFirstMessageReceivedSeconds:Number; | |
private var counter:int = 0; | |
private var stopWatch:int = 0; | |
private var pollWatch:int = 0; | |
private var isManualPolling:Boolean = false; | |
// Used to keep track of global message receive rate since subscription. | |
[Bindable] | |
public var globalRate:Number = 0; | |
private var globalStartTime:int; | |
[Bindable] | |
private var globalMessages:Number = 0; | |
[Bindable] | |
private var globalMessageAverage:Number = 0; | |
// Used to keep track of message receive rate in the last second. | |
[Bindable] | |
private var currentRate:Number = 0; | |
private var startTime:int = 0; | |
private var currentMessages:Number = 0; | |
private var waitForPollInterval:int = 0; | |
// How often to measure the message rate. | |
public const MEASUREMENT_TIME_MILLIS:int = 1000; | |
/** | |
* XMLListCollection to hold names of destinations that will be tested | |
*/ | |
[Bindable] | |
private var destinations:XMLListCollection; | |
/** | |
* ArrayCollection to hold names of channels that will be tested | |
*/ | |
[Bindable] | |
public var channelNames:ArrayCollection = new ArrayCollection(); | |
public function creationCompleteHandler():void | |
{ | |
//use this timer to stop test | |
testRunTimer = new Timer(MEASUREMENT_TIME_MILLIS,0); | |
testRunTimer.addEventListener(TimerEvent.TIMER, testRunTimerHandler); | |
measurementTimer = new Timer(MEASUREMENT_TIME_MILLIS,0); | |
measurementTimer.addEventListener(TimerEvent.TIMER, measurementTimerHandler); | |
//retrieve list of destinations | |
destinations = new XMLListCollection(new XMLList(xml.service.(@id == "message-service").destination.attribute("id"))); | |
destinationCB.dataProvider = destinations; | |
//filter and sort destinations | |
destinations.filterFunction = destinationsFilter; | |
sortDestinations.fields = [new SortField(null, true)]; | |
destinations.sort = sortDestinations; | |
//initialize Consumer | |
con = new Consumer(); | |
//add all event listener methods | |
con.addEventListener(MessageEvent.MESSAGE, msMessageHandler); | |
con.addEventListener(MessageFaultEvent.FAULT, conFault); | |
con.addEventListener(MessageAckEvent.ACKNOWLEDGE,consumerAck); | |
con.addEventListener(PropertyChangeEvent.PROPERTY_CHANGE,consumerPropertyChangeEventHandler); | |
con.addEventListener(ChannelEvent.DISCONNECT, chanDisconnect); | |
con.addEventListener(ChannelFaultEvent.FAULT, chanFault); | |
//force first refresh on destination combo | |
applyDestinationCriteria(); | |
//complete setup | |
setupChannelCB(); | |
} | |
private function setupChannelCB():void | |
{ | |
dest = destinationCB.selectedItem.toString(); | |
con.destination = dest; | |
//get the list of channel names for the destination; use as dataProvider for | |
channelNames = new ArrayCollection((ServerConfig.getChannelSet(dest) as ChannelSet).channelIds); | |
//sort channels | |
sortChannels.fields = [new SortField(null, true)]; | |
channelNames.sort = sortChannels; | |
channelNames.refresh(); | |
channelsCB.dataProvider = channelNames; | |
channelsCB.enabled = true; | |
destinationCriteria.enabled = true; | |
} | |
private function setupChannel():void | |
{ | |
//reset channel and channelSet | |
if(channelSet != null) | |
{ | |
channelSet = null; | |
channel = null; | |
} | |
channelSet = new ChannelSet(); | |
channel = ServerConfig.getChannel(channelsCB.selectedItem.toString()); | |
channelSet.addChannel(channel); | |
} | |
private function _waitForPollInterval(consumer:Consumer):int | |
{ | |
/** | |
* The 'waitForPollInterval' variable is a number that we use to control how long we want the Consumer to continue, in seconds, to remain in a run state | |
* once the MessageEvent.MESSAGE event stops firing. This is to compensate for clients or consumers connected over polling channels that may be in | |
* between polls. For example, when a polling-interval is 3000 millis we want to gaurantee that we do not stop the test prematurely or before the | |
* last poll intervel is reached. The test run time is kept track of using the counter variable. The counter is increased by 1 each time the | |
* testRunTimer TimerEvent.TIMER event listener is fired (every 1000 millis). The counter is reset to 0 each time the MessageEvent.MESSAGE event | |
* is fired indicating that we're still recieving messages. So, if the counter is > 1 that means no more messages are coming at the moment. | |
* For persistent connections such as RTMP and AMFStreaming as well as channels configured as long-polling channels this may be true but not | |
* for polling channels. There is code below to determine what the best waitForPollInterval is by default based on channel type but once | |
* there are more Clients connected to the destination we may need to override those settings and this method enables that. | |
*/ | |
var selectedItem:int = increaseWaitForPollIntervalCB.selectedItem as int; | |
var _waitForPollInterval:int; | |
if(channelType(consumer) == "AMFChannel") | |
{ | |
if(selectedItem >0) | |
{ | |
if(channelProps(consumer,"pollingEnabled") && channelProps(consumer,"pollingInterval", 3000)) | |
{ | |
_waitForPollInterval = selectedItem; | |
trace("polling is enabled at channel level selectedItem >0 " + _waitForPollInterval.toString()); | |
}//polling not enabled for this channel | |
else if (!channelProps(consumer,"pollingEnabled")) | |
{ | |
//enable manual polling | |
isManualPolling = true; | |
_waitForPollInterval = selectedItem; | |
trace("enabling manual polling: selectedItem >0" + _waitForPollInterval.toString()); | |
} | |
} | |
else | |
{ | |
if(channelProps(consumer,"pollingEnabled") && channelProps(consumer,"pollingInterval", 3000)) | |
{trace("channelType(consumer) == AMFChannel"); | |
_waitForPollInterval = 3; | |
trace("polling is enabled at channel level selectedItem = 0" + _waitForPollInterval.toString()); | |
}//polling not enabled for this channel | |
else if(!channelProps(consumer,"pollingEnabled")) | |
{ | |
//enable manual polling | |
isManualPolling = true; | |
_waitForPollInterval = 3; | |
trace("enabling manual polling: selectedItem = 0" + _waitForPollInterval.toString()); | |
} | |
else | |
{ | |
trace("FOOBAR"); | |
_waitForPollInterval = 3; | |
} | |
} | |
} | |
return _waitForPollInterval; | |
} | |
private function channelProps(consumer:Consumer, prop:String, interval:int=0):Boolean | |
{ | |
if(prop == "pollingEnabled") | |
{ | |
if((consumer.channelSet.currentChannel as AMFChannel).pollingEnabled) | |
{ | |
return true; | |
} | |
else | |
{ | |
return false; | |
} | |
} | |
else if (prop == "pollingInterval") | |
{ | |
if((consumer.channelSet.currentChannel as AMFChannel).pollingInterval < interval) | |
{ | |
return true; | |
} | |
else | |
{ | |
return false; | |
} | |
} | |
return false; | |
} | |
private function channelType(consumer:Consumer):String | |
{ | |
if(consumer.channelSet.currentChannel is AMFChannel) | |
{ | |
return "AMFChannel"; | |
} | |
else | |
{ | |
return "unknown"; | |
} | |
} | |
private function consumerSubscribe():void | |
{ | |
con.channelSet = channelSet; | |
con.subscribe(); | |
waitForPollInterval = _waitForPollInterval(con); | |
pollWatch = waitForPollInterval; | |
//enable this button only when manual polling is true | |
btnPoll.enabled = isManualPolling; | |
} | |
private function consumerUnsubscribe():void | |
{ | |
con.unsubscribe(); | |
con.disconnect(); | |
} | |
private function startTest():void | |
{ | |
//reset collections and other values | |
msgEventCountArray = null; | |
msgEventCountArray = new Array(); | |
finalMessageCountArray = null; | |
finalMessageCountArray = new Array(); | |
elapsedTimeSinceFirstReceiptInSeconds.text = ""; | |
globalStartTime = getTimer(); | |
globalMessages = 0; | |
currentMessages = 0; | |
startTime = 0; | |
testRunTimer.start(); | |
setupChannel(); | |
consumerSubscribe(); | |
} | |
private function stopTest():void | |
{ | |
//reset vars, stop timers, unsubscribe and compute final results | |
trace("stop test called"); | |
globalMessageAverage = calcMessageReceiptAverage(msgEventCountArray); | |
testRunTimer.stop(); | |
measurementTimer.stop(); | |
elapsedTimeSinceFirstReceiptInSeconds.text = ""; | |
stopWatch = 0; | |
globalRate = 0; | |
currentRate = 0; | |
consumerUnsubscribe(); | |
channelSet.removeChannel(channel); | |
channel = null; | |
counter = 0; | |
pollWatch = 0; | |
elapsedTimeSinceFirstMessageReceivedSeconds = 0; | |
isNotStartTest = false; | |
isStopTest = false; | |
failureCount = 0; | |
//reset this when stop event occurs | |
isFirstMessage = true; | |
btnPoll.enabled = false; | |
btnPoll.label = 'Begin Manual Polling'; | |
} | |
public function msMessageHandler(e:MessageEvent):void | |
{ | |
var now:int = getTimer(); | |
globalRate = 1000.0 * ++globalMessages / Number(now - globalStartTime); | |
//push message count | |
finalMessageCountArray.push(globalMessages); | |
if (startTime == 0) | |
startTime = now; | |
currentMessages++; | |
if (now - startTime > MEASUREMENT_TIME_MILLIS) | |
{ | |
currentRate = 1000.0 * currentMessages / Number(now - startTime); | |
msgEventCountArray.push(currentRate); | |
//trace("msgEventCountArray " + msgEventCountArray[msgEventCountArray.length -1].toString()); | |
currentMessages = 0; | |
startTime = now; | |
} | |
//if we're receiving messages then the test is not over | |
isStopTest = false; | |
//As long as we're recieving messages keep this set to 0 | |
counter = 0; | |
if(isFirstMessage) | |
{ | |
//now that we got first message since successfully subscribing set this to false indicating so | |
isFirstMessage = false; | |
measurementTimer.start(); | |
} | |
} | |
public function testRunTimerHandler(e:TimerEvent):void | |
{ | |
if(isNotStartTest)//First time so reset bit, capture start time | |
{ | |
isNotStartTest = false; | |
globalStartTime = getTimer(); | |
} | |
else | |
{ | |
//If no longer receiving messages from producer (desitnation may be emptying queue if/when buffered) then stop test | |
if(isStopTest && !isFirstMessage && counter > waitForPollInterval) | |
{ | |
trace(" waitForP: " + waitForPollInterval.toString()); | |
stopTest(); | |
} | |
else | |
{ | |
//Set to true each time throught. This will be reset to false in msMessageHandler | |
//meaning that we're still receiving msgs. If this value remains true, meaning that it was not reset back to false | |
//in the msMessageHandler then the test is over | |
isStopTest = true; | |
//increment counter variable by 1 | |
counter++; | |
} | |
} | |
} | |
public function measurementTimerHandler(e:TimerEvent):void | |
{ | |
if(!isFirstMessage)//First time begin keeping track of elapsed time | |
{ | |
++stopWatch; | |
if(isManualPolling) | |
{ | |
if(pollWatch > 0) | |
{ | |
elapsedTimeSinceFirstReceiptInSeconds.text = "\nReceived " + finalMessageCountArray[finalMessageCountArray.length -1].toString() | |
+ " in " + stopWatch.toString() + " seconds. \nUsing wait measure of " + waitForPollInterval.toString() | |
+ " poll again in " + (--pollWatch).toString() + " seconds"; | |
if(pollWatch==1) | |
{ | |
trace("measurementTimer************************* " + pollWatch.toString()); | |
poll(); | |
pollWatch = waitForPollInterval; | |
} | |
} | |
} | |
else | |
{ | |
elapsedTimeSinceFirstReceiptInSeconds.text = "\nReceived " + finalMessageCountArray[finalMessageCountArray.length -1].toString() | |
+ " in " + stopWatch.toString() + " seconds. \nUsing wait measure of " + waitForPollInterval.toString(); | |
} | |
globalMessageAverage = calcMessageReceiptAverage(msgEventCountArray); | |
} | |
} | |
private function poll():void | |
{ | |
if (!con.subscribed) | |
consumerSubscribe(); | |
var c:Channel = con.channelSet.currentChannel; | |
if (c is PollingChannel) | |
(c as PollingChannel).poll(); | |
//pollWatch = waitForPollInterval; | |
btnPoll.label = 'Polling'; | |
btnPoll.enabled=false; | |
} | |
public function displayCountsPerSecondIncrements(a:Array):void | |
{ | |
Alert.show(ObjectUtil.toString(a)); | |
} | |
public function conFault(e:MessageFaultEvent):void | |
{ | |
failureCount++; | |
noConFault = false; | |
conFaultString = e.faultString; | |
trace(e.faultString); | |
} | |
public function chanFault(e:ChannelFaultEvent):void | |
{ | |
failureCount++; | |
noConFault = false; | |
chanFaultString = e.faultString; | |
//trace(e.faultString + " : " + ObjectUtil.toString(e)); | |
trace(" : " + e.faultString + " : " + e.faultDetail); | |
} | |
private function consumerAck(event:MessageAckEvent):void | |
{ | |
//trace("event : " + ObjectUtil.toString(event.correlation.headers)); | |
if((event.correlation is CommandMessage) && (CommandMessage(event.correlation).operation == CommandMessage.SUBSCRIBE_OPERATION)) | |
{ | |
trace("MessageAckEvent: successfully subscribed!"); | |
} else { | |
trace("event.correlation : " + ObjectUtil.toString(event.correlation.headers)); | |
} | |
} | |
private function chanDisconnect(event:ChannelEvent):void | |
{ | |
//trace("event : " + ObjectUtil.toString(event.correlation.headers)); | |
if(event.connected) | |
{ | |
trace("ChannelEvent: connected"); | |
} | |
else if (!event.connected) | |
{ | |
trace("ChannelEvent: disconnect"); | |
} | |
else | |
{ | |
trace("ChannelEvent : " + ObjectUtil.toString(event)); | |
} | |
} | |
public function consumerPropertyChangeEventHandler(pe:PropertyChangeEvent):void | |
{ | |
//trace("property changed: " + ObjectUtil.toString(pe.target)); | |
} | |
public function destinationsFilter(obj:Object):Boolean | |
{ | |
var retval:Object; | |
// The search method returns an int (staring position of pattern within the String ) when true and -1 when not. | |
// So this looks backwards | |
if(String(ObjectUtil.toString(obj)).search(destinationCriteria.selectedItem.toString()) != -1) | |
{ | |
retval = obj; | |
} | |
return (retval); | |
} | |
private function applyDestinationCriteria():void | |
{ | |
//filter and sort destinations and channels | |
destinations.refresh(); | |
destinationCB.selectedIndex = 0; | |
channelNames.refresh(); | |
} | |
public function calcMessageReceiptAverage(arg:Array):Number | |
{ | |
//trace("arg: " + ObjectUtil.toString(arg)); | |
var result:Number = 0; | |
var iteration:Number = 0; | |
//add up all array elements | |
for each(var item:Number in arg) | |
{ | |
result = result + item; | |
if(item !=0) | |
{ | |
iteration++; | |
} | |
//trace("result loop: " + result); | |
} | |
trace("result/iteration: " + (result/iteration).toString()); | |
return result/iteration; | |
} | |
]]> | |
</mx:Script> | |
</mx:Application> |