blob: 55ea65dbeba5dbcf6aee81e9154f7f8946ee09b0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StompLoadTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompLoadTest.class);
private static final int TASK_COUNT = 100;
private static final int MSG_COUNT = 250; // AMQ-3819: Above 250 or so and the CPU goes bonkers with NOI+SSL.
private ExecutorService executor;
private CountDownLatch started;
private CountDownLatch ready;
private AtomicInteger receiveCount;
@Override
public void setUp() throws Exception {
super.setUp();
stompConnect();
stompConnection.connect("system", "manager");
executor = Executors.newFixedThreadPool(TASK_COUNT, new ThreadFactory() {
private long i = 0;
@Override
public Thread newThread(Runnable runnable) {
this.i++;
final Thread t = new Thread(runnable, "Test Worker " + this.i);
return t;
}
});
started = new CountDownLatch(TASK_COUNT);
ready = new CountDownLatch(1);
receiveCount = new AtomicInteger(0);
}
@Override
public void tearDown() throws Exception {
try {
executor.shutdownNow();
} catch (Exception e) {
} finally {
super.tearDown();
}
}
@Test(timeout=5*60*1000)
public void testStompUnloadLoad() throws Exception {
final List<StompConnection> taskConnections = new ArrayList<>();
for (int i = 0; i < TASK_COUNT; ++i) {
executor.execute(new Runnable() {
@Override
public void run() {
LOG.debug("Receive Thread Connecting to Broker.");
int numReceived = 0;
StompConnection connection = new StompConnection();
try {
stompConnect(connection);
connection.connect("system", "manager");
} catch (Exception e) {
LOG.error("Caught Exception while connecting: " + e.getMessage());
}
taskConnections.add(connection);
try {
for (int i = 0; i < 10; i++) {
connection.subscribe("/queue/test-" + i, "auto");
connection.subscribe("/topic/test-" + i, "auto");
}
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
connection.subscribe("/topic/" + getTopicName(), "auto", headers);
ready.await();
// Now that the main test thread is ready we wait a bit to let the tasks
// all subscribe and the CPU to settle a bit.
TimeUnit.SECONDS.sleep(3);
started.countDown();
while (receiveCount.get() != TASK_COUNT * MSG_COUNT) {
// Read Timeout ends this task, we override the default here since there
// are so many threads running and we don't know how slow the test box is.
StompFrame frame = connection.receive(TimeUnit.SECONDS.toMillis(60));
assertNotNull(frame);
numReceived++;
if (LOG.isDebugEnabled() && (numReceived % 50) == 0 || numReceived == MSG_COUNT) {
LOG.debug("Receiver thread got message: " + frame.getHeaders().get("message-id"));
}
receiveCount.incrementAndGet();
}
} catch (Exception e) {
if (numReceived != MSG_COUNT) {
LOG.warn("Receive task caught exception after receipt of ["+numReceived+
"] messages: " + e.getMessage());
}
}
}
});
}
ready.countDown();
assertTrue("Timed out waiting for receivers to start.", started.await(5, TimeUnit.MINUTES));
String frame;
// Lets still wait a bit to make sure all subscribers get a fair shake at
// getting online before we send. Account for slow Hudson machines
TimeUnit.SECONDS.sleep(5);
for( int ix = 0; ix < MSG_COUNT; ix++) {
frame = "SEND\n" +
"destination:/topic/" + getTopicName() +
"\nid:" + ix +
"\ncontent-length:5" + " \n\n" +
"\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
LOG.info("All " + MSG_COUNT + " message have been sent, awaiting receipt.");
assertTrue("Should get [" + TASK_COUNT * MSG_COUNT + "] message but was: " + receiveCount.get(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return receiveCount.get() == TASK_COUNT * MSG_COUNT;
}
}, TimeUnit.MINUTES.toMillis(10)));
LOG.info("Test Completed and all messages received, shutting down.");
for (StompConnection taskConnection : taskConnections) {
try {
taskConnection.disconnect();
taskConnection.close();
} catch (Exception ex) {
}
}
executor.shutdown();
executor.awaitTermination(2, TimeUnit.MINUTES);
stompDisconnect();
}
}