blob: a9da837deacfb12144ec4e051975f3444e0653a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.disttest.client;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.jms.Message;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.client.utils.ExecutorWithLimits;
import org.apache.qpid.disttest.client.utils.ExecutorWithLimitsFactory;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
import org.apache.qpid.disttest.message.CreateProducerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerParticipant implements Participant
{
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerParticipant.class);
private final ClientJmsDelegate _jmsDelegate;
private final CreateProducerCommand _command;
private final ParticipantResultFactory _resultFactory;
private ExecutorWithLimits _limiter;
public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command)
{
_jmsDelegate = jmsDelegate;
_command = command;
_resultFactory = new ParticipantResultFactory();
}
@Override
public ParticipantResult doIt(String registeredClientName) throws Exception
{
long numberOfMessages = _command.getNumberOfMessages();
long maximumDuration = _command.getMaximumDuration();
if (maximumDuration == 0 && numberOfMessages == 0)
{
throw new DistributedTestException("number of messages and duration cannot both be zero");
}
long duration = maximumDuration - _command.getStartDelay();
if (maximumDuration > 0 && duration <= 0)
{
throw new DistributedTestException("Start delay must be less than maximum test duration");
}
final long requiredDuration = duration > 0 ? duration : 0;
doSleepForStartDelay();
final int batchSize = _command.getBatchSize();
final int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
final long startTime = System.currentTimeMillis();
Message lastPublishedMessage = null;
int numberOfMessagesSent = 0;
long totalPayloadSizeOfAllMessagesSent = 0;
NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>();
_limiter = ExecutorWithLimitsFactory.createExecutorWithLimit(startTime, requiredDuration);
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Producer {} about to send messages. Duration limit: {} ms, Message limit: {}",
new Object[]{getName(), requiredDuration, numberOfMessages});
}
while (true)
{
if (numberOfMessages > 0 && numberOfMessagesSent >= numberOfMessages
|| requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration)
{
break;
}
try
{
lastPublishedMessage = _limiter.execute(new Callable<Message>()
{
@Override
public Message call() throws Exception
{
return _jmsDelegate.sendNextMessage(_command);
}
});
}
catch (CancellationException ce)
{
LOGGER.debug("Producer send was cancelled due to maximum duration {} ms", requiredDuration);
break;
}
numberOfMessagesSent++;
int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage);
totalPayloadSizeOfAllMessagesSent += lastPayloadSize;
allProducedPayloadSizes.add(lastPayloadSize);
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("message " + numberOfMessagesSent + " sent by " + this);
}
final boolean batchLimitReached = batchSize <= 0
|| numberOfMessagesSent % batchSize == 0;
if (batchLimitReached)
{
if (LOGGER.isTraceEnabled() && batchSize > 0)
{
LOGGER.trace("Committing: batch size " + batchSize );
}
_jmsDelegate.commitIfNecessary(_command.getSessionName());
doSleepForInterval();
}
}
// commit the remaining batch messages
if (batchSize > 0 && numberOfMessagesSent % batchSize != 0)
{
if (LOGGER.isTraceEnabled())
{
LOGGER.trace("Committing: batch size " + batchSize );
}
_jmsDelegate.commitIfNecessary(_command.getSessionName());
}
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Producer {} finished publishing. Number of messages published: {}",
getName(), numberOfMessagesSent);
}
Date start = new Date(startTime);
Date end = new Date();
int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(allProducedPayloadSizes);
return _resultFactory.createForProducer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
numberOfMessagesSent,
payloadSize, totalPayloadSizeOfAllMessagesSent, start, end);
}
private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allPayloadSizes)
{
return allPayloadSizes.size() == 1 ? allPayloadSizes.first() : 0;
}
private void doSleepForStartDelay()
{
long sleepTime = _command.getStartDelay();
if (sleepTime > 0)
{
LOGGER.debug("{} sleeping for {} milliseconds before starting", getName(), sleepTime);
// start delay is specified. Sleeping...
doSleep(sleepTime);
}
}
private void doSleepForInterval() throws InterruptedException
{
long sleepTime = _command.getInterval();
if (sleepTime > 0)
{
doSleep(sleepTime);
}
}
private void doSleep(long sleepTime)
{
try
{
Thread.sleep(sleepTime);
}
catch (final InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
@Override
public void releaseResources()
{
if (_limiter != null)
{
_limiter.shutdown();
}
_jmsDelegate.closeTestProducer(_command.getParticipantName());
}
@Override
public String getName()
{
return _command.getParticipantName();
}
@Override
public String toString()
{
return "ProducerParticipant [command=" + _command + "]";
}
}