blob: c21a78d359861de435c256661b9f1d05bb267580 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.disttest.results.aggregation;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
public class ParticipantResultAggregator
{
private final String _aggregatedResultName;
private final Class<? extends ParticipantResult> _targetClass;
private long _minStartDate = Long.MAX_VALUE;
private long _maxEndDate = 0;
private long _numberOfMessagesProcessed = 0;
private long _totalPayloadProcessed = 0;
private int _totalNumberOfConsumers = 0;
private int _totalNumberOfProducers = 0;
private NavigableSet<Integer> _encounteredPayloadSizes = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredIterationNumbers = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredBatchSizes = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredAcknowledgeMode = new TreeSet<Integer>();
private NavigableSet<Integer> _encounteredDeliveryModes = new TreeSet<Integer>();
private NavigableSet<Boolean> _encounteredDurableSubscriptions = new TreeSet<Boolean>();
private NavigableSet<Boolean> _encounteredTopics = new TreeSet<Boolean>();
private NavigableSet<String> _encountedTestNames = new TreeSet<String>();
private SeriesStatistics _latencyStatistics = new SeriesStatistics();
public ParticipantResultAggregator(Class<? extends ParticipantResult> targetClass, String aggregateResultName)
{
_aggregatedResultName = aggregateResultName;
_targetClass = targetClass;
}
public void aggregate(ParticipantResult result)
{
if (isAggregatable(result))
{
rollupConstantAttributes(result);
computeVariableAttributes(result);
if (result instanceof ConsumerParticipantResult)
{
ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result;
_latencyStatistics.addMessageLatencies(consumerParticipantResult.getMessageLatencies());
_latencyStatistics.aggregate();
}
}
}
public ParticipantResult getAggregatedResult()
{
ParticipantResult aggregatedResult;
if (_targetClass == ConsumerParticipantResult.class)
{
ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(_aggregatedResultName);
consumerParticipantResult.setAverageLatency(_latencyStatistics.getAverage());
consumerParticipantResult.setMinLatency(_latencyStatistics.getMinimum());
consumerParticipantResult.setMaxLatency(_latencyStatistics.getMaximum());
consumerParticipantResult.setLatencyStandardDeviation(_latencyStatistics.getStandardDeviation());
aggregatedResult = consumerParticipantResult;
}
else
{
aggregatedResult = new ParticipantResult(_aggregatedResultName);
}
setRolledUpConstantAttributes(aggregatedResult);
setComputedVariableAttributes(aggregatedResult);
return aggregatedResult;
}
private boolean isAggregatable(ParticipantResult result)
{
return _targetClass.isAssignableFrom(result.getClass());
}
private void computeVariableAttributes(ParticipantResult result)
{
_numberOfMessagesProcessed += result.getNumberOfMessagesProcessed();
_totalPayloadProcessed += result.getTotalPayloadProcessed();
_totalNumberOfConsumers += result.getTotalNumberOfConsumers();
_totalNumberOfProducers += result.getTotalNumberOfProducers();
_minStartDate = Math.min(_minStartDate, result.getStartInMillis());
_maxEndDate = Math.max(_maxEndDate, result.getEndInMillis());
}
private void rollupConstantAttributes(ParticipantResult result)
{
if (result.getTestName() != null)
{
_encountedTestNames.add(result.getTestName());
}
_encounteredPayloadSizes.add(result.getPayloadSize());
_encounteredIterationNumbers.add(result.getIterationNumber());
_encounteredBatchSizes.add(result.getBatchSize());
_encounteredAcknowledgeMode.add(result.getAcknowledgeMode());
if (result instanceof ProducerParticipantResult)
{
ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) result;
_encounteredDeliveryModes.add(producerParticipantResult.getDeliveryMode());
}
else if(result instanceof ConsumerParticipantResult)
{
ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result;
_encounteredDurableSubscriptions.add(consumerParticipantResult.isDurableSubscription());
_encounteredTopics.add(consumerParticipantResult.isTopic());
}
}
private void setComputedVariableAttributes(ParticipantResult aggregatedResult)
{
aggregatedResult.setNumberOfMessagesProcessed(_numberOfMessagesProcessed);
aggregatedResult.setTotalPayloadProcessed(_totalPayloadProcessed);
aggregatedResult.setTotalNumberOfConsumers(_totalNumberOfConsumers);
aggregatedResult.setTotalNumberOfProducers(_totalNumberOfProducers);
aggregatedResult.setStartDate(new Date(_minStartDate));
aggregatedResult.setEndDate(new Date(_maxEndDate));
aggregatedResult.setThroughput(calculateThroughputInKiloBytesPerSecond());
aggregatedResult.setMessageThroughput(calculateThroughputInMessagesPerSecond());
}
private void setRolledUpConstantAttributes(ParticipantResult aggregatedResult)
{
if (_encounteredIterationNumbers.size() == 1)
{
aggregatedResult.setIterationNumber( _encounteredIterationNumbers.first());
}
if (_encounteredPayloadSizes.size() == 1)
{
aggregatedResult.setPayloadSize(_encounteredPayloadSizes.first());
}
if (_encountedTestNames.size() == 1)
{
aggregatedResult.setTestName(_encountedTestNames.first());
}
if (_encounteredBatchSizes.size() == 1)
{
aggregatedResult.setBatchSize(_encounteredBatchSizes.first());
}
if (_encounteredAcknowledgeMode.size() == 1)
{
aggregatedResult.setAcknowledgeMode(_encounteredAcknowledgeMode.first());
}
if (aggregatedResult instanceof ProducerParticipantResult)
{
ProducerParticipantResult producerParticipantResult = (ProducerParticipantResult) aggregatedResult;
if(_encounteredDeliveryModes.size() == 1)
{
producerParticipantResult.setDeliveryMode(_encounteredDeliveryModes.first());
}
}
if (aggregatedResult instanceof ConsumerParticipantResult)
{
ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult) aggregatedResult;
if(_encounteredDurableSubscriptions.size() == 1)
{
consumerParticipantResult.setDurableSubscription(_encounteredDurableSubscriptions.first());
}
if(_encounteredTopics.size() == 1)
{
consumerParticipantResult.setTopic(_encounteredTopics.first());
}
}
}
private double calculateThroughputInKiloBytesPerSecond()
{
double durationInMillis = _maxEndDate - _minStartDate;
double durationInSeconds = durationInMillis / 1000;
double totalPayloadProcessedInKiloBytes = ((double)_totalPayloadProcessed) / 1024;
return totalPayloadProcessedInKiloBytes/durationInSeconds;
}
private int calculateThroughputInMessagesPerSecond()
{
double durationInMillis = _maxEndDate - _minStartDate;
if (durationInMillis == 0 )
{
return 0;
}
return (int)Math.round((_numberOfMessagesProcessed * 1000.0d)/durationInMillis);
}
}