blob: d3a5e30191afe3a41c829d77aebad9748da9b733 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.disttest.client;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerParticipant implements Participant
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerParticipant.class);
private final AtomicInteger _totalNumberOfMessagesReceived = new AtomicInteger(0);
private final NavigableSet<Integer> _allConsumedPayloadSizes = new ConcurrentSkipListSet<Integer>();
private final AtomicLong _totalPayloadSizeOfAllMessagesReceived = new AtomicLong(0);
private final CountDownLatch _asyncRunHasFinished = new CountDownLatch(1);
private final ClientJmsDelegate _jmsDelegate;
private final CreateConsumerCommand _command;
private final ParticipantResultFactory _resultFactory;
private long _startTime;
private volatile Exception _asyncMessageListenerException;
private List<Long> _messageLatencies;
public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command)
{
_jmsDelegate = delegate;
_command = command;
_resultFactory = new ParticipantResultFactory();
if (command.isEvaluateLatency())
{
_messageLatencies = new ArrayList<Long>();
}
}
@Override
public ParticipantResult doIt(String registeredClientName) throws Exception
{
final Date start = new Date();
final int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0)
{
throw new DistributedTestException("number of messages and duration cannot both be zero");
}
if (_command.isSynchronous())
{
synchronousRun();
}
else
{
LOGGER.info("Consumer {} registering listener", getName());
_jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){
@Override
public void onMessage(Message message)
{
processAsynchMessage(message);
}
});
waitUntilMsgListenerHasFinished();
rethrowAnyAsyncMessageListenerException();
}
Date end = new Date();
int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}",
getName(), numberOfMessagesReceived);
}
ParticipantResult result = _resultFactory.createForConsumer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
numberOfMessagesReceived,
payloadSize,
totalPayloadSize,
start, end, _messageLatencies);
return result;
}
private void synchronousRun()
{
LOGGER.info("Consumer {} about to consume messages", getName());
_startTime = System.currentTimeMillis();
Message message = null;
do
{
message = _jmsDelegate.consumeMessage(_command.getParticipantName(),
_command.getReceiveTimeout());
} while (processMessage(message));
}
/**
* @return whether to continue running (ie returns false if the message quota has been reached)
*/
private boolean processMessage(Message message)
{
int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ;
boolean batchEnabled = _command.getBatchSize() > 0;
boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0;
if (message != null)
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("message " + messageCount + " received by " + this);
}
int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
_allConsumedPayloadSizes.add(messagePayloadSize);
_totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
if (_command.isEvaluateLatency())
{
long mesageTimestamp;
try
{
mesageTimestamp = message.getJMSTimestamp();
}
catch (JMSException e)
{
throw new DistributedTestException("Cannot get message timestamp!", e);
}
long latency = System.currentTimeMillis() - mesageTimestamp;
_messageLatencies.add(latency);
}
if (!batchEnabled || batchComplete)
{
if (LOGGER.isTraceEnabled() && batchEnabled)
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
_jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
}
boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages();
boolean reachedMaximumDuration = _command.getMaximumDuration() > 0 && System.currentTimeMillis() - _startTime >= _command.getMaximumDuration();
boolean finishedConsuming = reachedExpectedNumberOfMessages || reachedMaximumDuration;
if (finishedConsuming)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("message " + messageCount
+ " reachedExpectedNumberOfMessages " + reachedExpectedNumberOfMessages
+ " reachedMaximumDuration " + reachedMaximumDuration);
}
if (batchEnabled && !batchComplete)
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
// commit/acknowledge remaining messages if necessary
_jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
return false;
}
return true;
}
/**
* Intended to be called from a {@link MessageListener}. Updates {@link #_asyncRunHasFinished} if
* no more messages should be processed, causing {@link #doIt(String)} to exit.
*/
public void processAsynchMessage(Message message)
{
boolean continueRunning = true;
try
{
if (_startTime == 0)
{
// reset counter and start time on receiving of first message
_startTime = System.currentTimeMillis();
}
continueRunning = processMessage(message);
}
catch (Exception e)
{
LOGGER.error("Error occured consuming message " + _totalNumberOfMessagesReceived, e);
continueRunning = false;
_asyncMessageListenerException = e;
}
if(!continueRunning)
{
_asyncRunHasFinished.countDown();
}
}
@Override
public void releaseResources()
{
_jmsDelegate.closeTestConsumer(_command.getParticipantName());
}
private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allSizes)
{
return allSizes.size() == 1 ? _allConsumedPayloadSizes.first() : 0;
}
private void rethrowAnyAsyncMessageListenerException()
{
if (_asyncMessageListenerException != null)
{
throw new DistributedTestException(_asyncMessageListenerException);
}
}
private void waitUntilMsgListenerHasFinished() throws Exception
{
LOGGER.debug("waiting until message listener has finished for " + this);
_asyncRunHasFinished.await();
LOGGER.debug("Message listener has finished for " + this);
}
@Override
public String getName()
{
return _command.getParticipantName();
}
@Override
public String toString()
{
return "ConsumerParticipant [_command=" + _command + ", _startTime=" + _startTime + "]";
}
}