blob: adf282b7385e75bc4108e1917b4d5e177edda6df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
* <p/>
* Symptoms:
* - 1 record is lost "early" in the stream.
* - no more records lost.
* <p/>
* Test Configuration:
* - Broker Settings:
* - Destination Policy
* - Occurs with "Destination Policy" using Store Cursor and a memory limit
* - Not reproduced without "Destination Policy" defined
* - Persistence Adapter
* - Memory: Does not occur.
* - KahaDB: Occurs.
* - Messages
* - Occurs with TextMessage and BinaryMessage
* - Persistent messages.
* <p/>
* Notes:
* - Lower memory limits increase the rate of occurrence.
* - Higher memory limits may prevent the problem (probably because memory limits not reached).
* - Producers sending a number of messages before consumers come online increases rate of occurrence.
*/
public class AMQ3167Test {
protected BrokerService embeddedBroker;
protected static final int MEMORY_LIMIT = 16 * 1024;
protected static boolean Debug_f = false;
protected long Producer_stop_time = 0;
protected long Consumer_stop_time = 0;
protected long Consumer_startup_delay_ms = 2000;
protected boolean Stop_after_error = true;
protected Connection JMS_conn;
protected long Num_error = 0;
//// ////
//// UTILITIES ////
//// ////
/**
* Create a new, unsecured, client connection to the test broker using the given username and password. This
* connection bypasses all security.
* <p/>
* Don't forget to start the connection or no messages will be received by consumers even though producers
* will work fine.
*
* @username name of the JMS user for the connection; may be null.
* @password Password for the JMS user; may be null.
*/
protected Connection createUnsecuredConnection(String username, String password)
throws javax.jms.JMSException {
ActiveMQConnectionFactory conn_fact;
conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
return conn_fact.createConnection(username, password);
}
//// ////
//// TEST FUNCTIONALITY ////
//// ////
@Before
public void testPrep()
throws Exception {
embeddedBroker = new BrokerService();
configureBroker(embeddedBroker);
embeddedBroker.start();
embeddedBroker.waitUntilStarted();
// Prepare the connection
JMS_conn = createUnsecuredConnection(null, null);
JMS_conn.start();
}
@After
public void testCleanup()
throws java.lang.Exception {
JMS_conn.stop();
embeddedBroker.stop();
}
protected void configureBroker(BrokerService broker_svc)
throws Exception {
TransportConnector conn;
broker_svc.setBrokerName("testbroker1");
broker_svc.setUseJmx(false);
broker_svc.setPersistent(true);
broker_svc.setDataDirectory("target/AMQ3167Test");
configureDestinationPolicy(broker_svc);
}
/**
* NOTE: overrides any prior policy map defined for the broker service.
*/
protected void configureDestinationPolicy(BrokerService broker_svc) {
PolicyMap pol_map;
PolicyEntry pol_ent;
ArrayList<PolicyEntry> ent_list;
ent_list = new ArrayList<PolicyEntry>();
//
// QUEUES
//
pol_ent = new PolicyEntry();
pol_ent.setQueue(">");
pol_ent.setMemoryLimit(MEMORY_LIMIT);
pol_ent.setProducerFlowControl(false);
ent_list.add(pol_ent);
//
// COMPLETE POLICY MAP
//
pol_map = new PolicyMap();
pol_map.setPolicyEntries(ent_list);
broker_svc.setDestinationPolicy(pol_map);
}
//// ////
//// TEST ////
//// ////
@Test
public void testQueueLostMessage()
throws Exception {
Destination dest;
dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
// 10 seconds from now
Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L);
// 15 seconds from now
Consumer_stop_time = Producer_stop_time + (5L * 1000000000L);
runLostMsgTest(dest, 1000000, 1, 1, false);
// Make sure failures in the threads are thoroughly reported in the JUnit framework.
assertTrue(Num_error == 0);
}
/**
*
*/
protected static void log(String msg) {
if (Debug_f)
java.lang.System.err.println(msg);
}
/**
* Main body of the lost-message test.
*/
protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess,
boolean topic_f)
throws Exception {
Thread prod_thread;
Thread cons_thread;
String tag;
Session sess;
MessageProducer prod;
MessageConsumer cons;
int ack_mode;
//
// Start the producer
//
tag = "prod";
log(">> Starting producer " + tag);
sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE);
prod = sess.createProducer(dest);
prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
prod_thread.start();
log("Started producer " + tag);
//
// Delay before starting consumers
//
log("Waiting before starting consumers");
java.lang.Thread.sleep(Consumer_startup_delay_ms);
//
// Now create and start the consumer
//
tag = "cons";
log(">> Starting consumer");
if (num_recv_per_sess > 1)
ack_mode = Session.CLIENT_ACKNOWLEDGE;
else
ack_mode = Session.AUTO_ACKNOWLEDGE;
sess = JMS_conn.createSession(false, ack_mode);
cons = sess.createConsumer(dest);
cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
cons_thread.start();
log("Started consumer " + tag);
//
// Wait for the producer and consumer to finish.
//
log("< waiting for producer.");
prod_thread.join();
log("< waiting for consumer.");
cons_thread.join();
log("Shutting down");
}
//// ////
//// INTERNAL CLASSES ////
//// ////
/**
* Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop
* time is reached, or a test error is detected.
*/
protected class producerThread extends Thread {
protected Session msgSess;
protected MessageProducer msgProd;
protected String producerTag;
protected int numMsg;
protected int numPerSess;
protected long producer_stop_time;
producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
super();
producer_stop_time = 0;
msgSess = sess;
msgProd = prod;
producerTag = tag;
numMsg = num_msg;
numPerSess = sess_size;
}
public void execTest()
throws Exception {
Message msg;
int sess_start;
int cur;
sess_start = 0;
cur = 0;
while ((cur < numMsg) && (!didTimeOut()) &&
((!Stop_after_error) || (Num_error == 0))) {
msg = msgSess.createTextMessage("test message from " + producerTag);
msg.setStringProperty("testprodtag", producerTag);
msg.setIntProperty("seq", cur);
if (msg instanceof ActiveMQMessage) {
((ActiveMQMessage) msg).setResponseRequired(true);
}
//
// Send the message.
//
msgProd.send(msg);
cur++;
//
// Commit if the number of messages per session has been reached, and
// transactions are being used (only when > 1 msg per sess).
//
if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
msgSess.commit();
sess_start = cur;
}
}
// Make sure to send the final commit, if there were sends since the last commit.
if ((numPerSess > 1) && ((cur - sess_start) > 0))
msgSess.commit();
if (cur < numMsg)
log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() +
" (stop time " + producer_stop_time + ")");
}
/**
* Check whether it is time for the producer to terminate.
*/
protected boolean didTimeOut() {
if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time))
return true;
return false;
}
/**
* Run the producer.
*/
@Override
public void run() {
try {
log("- running producer " + producerTag);
execTest();
log("- finished running producer " + producerTag);
} catch (Throwable thrown) {
Num_error++;
fail("producer " + producerTag + " failed: " + thrown.getMessage());
throw new Error("producer " + producerTag + " failed", thrown);
}
}
@Override
public String toString() {
return producerTag;
}
}
/**
* Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop
* time is reached, or a test error is detected.
*/
protected class consumerThread extends Thread {
protected Session msgSess;
protected MessageConsumer msgCons;
protected String consumerTag;
protected int numMsg;
protected int numPerSess;
consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
super();
msgSess = sess;
msgCons = cons;
consumerTag = tag;
numMsg = num_msg;
numPerSess = sess_size;
}
public void execTest()
throws Exception {
Message msg;
int sess_start;
int cur;
msg = null;
sess_start = 0;
cur = 0;
while ((cur < numMsg) && (!didTimeOut()) &&
((!Stop_after_error) || (Num_error == 0))) {
//
// Use a timeout of 1 second to periodically check the consumer timeout.
//
msg = msgCons.receive(1000);
if (msg != null) {
checkMessage(msg, cur);
cur++;
if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
msg.acknowledge();
sess_start = cur;
}
}
}
// Acknowledge the last messages, if they were not yet acknowledged.
if ((numPerSess > 1) && ((cur - sess_start) > 0))
msg.acknowledge();
if (cur < numMsg)
log("* Consumer " + consumerTag + " timed out");
}
/**
* Check whether it is time for the consumer to terminate.
*/
protected boolean didTimeOut() {
if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time))
return true;
return false;
}
/**
* Verify the message received. Sequence numbers are checked and are expected to exactly match the
* message number (starting at 0).
*/
protected void checkMessage(Message msg, int exp_seq)
throws javax.jms.JMSException {
int seq;
seq = msg.getIntProperty("seq");
if (exp_seq != seq) {
Num_error++;
fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq);
}
}
/**
* Run the consumer.
*/
@Override
public void run() {
try {
log("- running consumer " + consumerTag);
execTest();
log("- running consumer " + consumerTag);
} catch (Throwable thrown) {
Num_error++;
fail("consumer " + consumerTag + " failed: " + thrown.getMessage());
throw new Error("consumer " + consumerTag + " failed", thrown);
}
}
@Override
public String toString() {
return consumerTag;
}
}
}