blob: 0d941f57d2b76ab51d114aaa5526d70965989238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spring;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerBean extends Assert implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerBean.class);
private final List<Message> messages = new ArrayList<Message>();
private boolean verbose;
private String id = null;
/**
* Constructor.
*/
public ConsumerBean() {
}
public ConsumerBean(String id) {
this.id = id;
}
/**
* @return all the messages on the list so far, clearing the buffer
*/
public List<Message> flushMessages() {
List<Message> answer = null;
synchronized(messages) {
answer = new ArrayList<Message>(messages);
messages.clear();
}
return answer;
}
/**
* Method implemented from MessageListener interface.
*
* @param message
*/
@Override
public void onMessage(Message message) {
synchronized (messages) {
messages.add(message);
if (verbose) {
LOG.info("" + id + "Received: " + message);
}
messages.notifyAll();
}
}
/**
* Use to wait for a single message to arrive.
*/
public void waitForMessageToArrive() {
LOG.info("Waiting for message to arrive");
long start = System.currentTimeMillis();
try {
if (hasReceivedMessage()) {
synchronized (messages) {
messages.wait(4000);
}
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
}
long end = System.currentTimeMillis() - start;
LOG.info("End of wait for " + end + " millis");
}
/**
* Used to wait for a message to arrive given a particular message count.
*
* @param messageCount
*/
public void waitForMessagesToArrive(int messageCount){
waitForMessagesToArrive(messageCount,120 * 1000);
}
public void waitForMessagesToArrive(int messageCount,long maxWaitTime) {
long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis();
long endTime = start + maxWaitTime;
while (maxRemainingMessageCount > 0) {
try {
synchronized (messages) {
messages.wait(1000);
}
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
LOG.info("Caught: " + e);
}
maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
}
long end = System.currentTimeMillis() - start;
LOG.info("End of wait for " + end + " millis");
}
public void assertMessagesArrived(int total) {
waitForMessagesToArrive(total);
synchronized (messages) {
int count = messages.size();
assertEquals("Messages received", total, count);
}
}
public void assertMessagesArrived(int total, long maxWaitTime) {
waitForMessagesToArrive(total,maxWaitTime);
synchronized (messages) {
int count = messages.size();
assertEquals("Messages received", total, count);
}
}
public boolean isVerbose() {
return verbose;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public List<Message> getMessages() {
return messages;
}
/**
* Identifies if the message is empty.
*
* @return
*/
protected boolean hasReceivedMessage() {
return messages.isEmpty();
}
/**
* Identifies if the message count has reached the total size of message.
*
* @param messageCount
* @return
*/
protected boolean hasReceivedMessages(int messageCount) {
synchronized (messages) {
return messages.size() >= messageCount;
}
}
}