blob: eccbf1bf10b2e57070df17d1b5539caedbb5fae6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.vm;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VMTransportThreadSafeTest {
private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
private final static String location1 = "vm://transport1";
private final static String location2 = "vm://transport2";
private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<DummyCommand>();
private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<DummyCommand>();
private class DummyCommand extends BaseCommand {
public final int sequenceId;
public DummyCommand() {
this.sequenceId = 0;
}
public DummyCommand(int id) {
this.sequenceId = id;
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return null;
}
@Override
public byte getDataStructureType() {
return 42;
}
}
private class VMTestTransportListener implements TransportListener {
protected final Queue<DummyCommand> received;
public boolean shutdownReceived = false;
public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
this.received = receiveQueue;
}
@Override
public void onCommand(Object command) {
if (command instanceof ShutdownInfo) {
shutdownReceived = true;
} else {
received.add((DummyCommand) command);
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
}
private class VMResponderTransportListener implements TransportListener {
protected final Queue<DummyCommand> received;
private final Transport peer;
public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
this.received = receiveQueue;
this.peer = peer;
}
@Override
public void onCommand(Object command) {
if (command instanceof ShutdownInfo) {
return;
} else {
received.add((DummyCommand) command);
if (peer != null) {
try {
peer.oneway(command);
} catch (IOException e) {
}
}
}
}
@Override
public void onException(IOException error) {
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
}
private class SlowVMTestTransportListener extends VMTestTransportListener {
private final TimeUnit delayUnit;
private final long delay;
public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
this(receiveQueue, 10, TimeUnit.MILLISECONDS);
}
public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
super(receiveQueue);
this.delay = delay;
this.delayUnit = delayUnit;
}
@Override
public void onCommand(Object command) {
super.onCommand(command);
try {
delayUnit.sleep(delay);
} catch (InterruptedException e) {
}
}
}
private class GatedVMTestTransportListener extends VMTestTransportListener {
private final CountDownLatch gate;
public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
this(receiveQueue, new CountDownLatch(1));
}
public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
super(receiveQueue);
this.gate = gate;
}
@Override
public void onCommand(Object command) {
super.onCommand(command);
try {
gate.await();
} catch (InterruptedException e) {
}
}
}
private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
int lastSequenceId = 0;
for(DummyCommand command : queue) {
int id = command.sequenceId;
assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
}
}
@Before
public void setUp() throws Exception {
localReceived.clear();
remoteReceived.clear();
}
@After
public void tearDown() throws Exception {
}
@Test(timeout=60000)
public void testStartWthoutListenerIOE() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
remote.setTransportListener(new VMTestTransportListener(localReceived));
try {
local.start();
fail("Should have thrown an IOExcoption");
} catch (IOException e) {
}
}
@Test(timeout=60000)
public void testOnewayOnStoppedTransportTDE() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
local.start();
local.stop();
try {
local.oneway(new DummyCommand());
fail("Should have thrown a TransportDisposedException");
} catch(TransportDisposedIOException e) {
}
}
@Test(timeout=60000)
public void testStopSendsShutdownToPeer() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(remoteListener);
local.start();
local.stop();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteListener.shutdownReceived;
}
}));
}
@Test(timeout=60000)
public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
remote.setTransportListener(remoteListener);
remote.start();
final Response[] answer = new Response[1];
ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
responseCorrelator.start();
responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
@Override
public void onCompletion(FutureResponse resp) {
try {
answer[0] = resp.getResult();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// simulate broker stop
remote.stop();
assertTrue("got expected exception response", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("answer: " + answer[0]);
return answer[0] instanceof ExceptionResponse && ((ExceptionResponse)answer[0]).getException() instanceof TransportDisposedIOException;
}
}));
local.stop();
}
@Test(timeout=60000)
public void testMultipleStartsAndStops() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
local.start();
remote.start();
local.start();
remote.start();
for(int i = 0; i < 100; ++i) {
local.oneway(new DummyCommand());
}
for(int i = 0; i < 100; ++i) {
remote.oneway(new DummyCommand());
}
local.start();
remote.start();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == 100;
}
}));
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return localReceived.size() == 100;
}
}));
local.stop();
local.stop();
remote.stop();
remote.stop();
}
@Test(timeout=60000)
public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
doTestStartWithPeerNotStartedEnqueusCommands(false);
}
private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
remote.setAsync(async);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
local.start();
for(int i = 0; i < 100; ++i) {
local.oneway(new DummyCommand());
}
assertEquals(100, remote.getMessageQueue().size());
remote.start();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == 100;
}
}));
local.stop();
remote.stop();
}
@Test(timeout=60000)
public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
doTestBlockedOnewayEnqeueAandStopTransport(true);
}
@Test(timeout=60000)
public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
doTestBlockedOnewayEnqeueAandStopTransport(false);
}
private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
final AtomicInteger sequenceId = new AtomicInteger();
remote.setAsync(async);
remote.setAsyncQueueDepth(99);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
local.start();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 100; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
t.start();
LOG.debug("Started async delivery, wait for remote's queue to fill up");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remote.getMessageQueue().remainingCapacity() == 0;
}
}));
LOG.debug("Remote messageQ is full, start it and stop all");
remote.start();
local.stop();
remote.stop();
}
@Test(timeout=60000)
public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
final AtomicInteger sequenceId = new AtomicInteger();
remote.setAsync(true);
remote.setAsyncQueueDepth(2);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
local.start();
remote.start();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 3; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
t.start();
LOG.debug("Started async delivery, wait for remote's queue to fill up");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remote.getMessageQueue().remainingCapacity() == 0;
}
}));
LOG.debug("Starting async gate open.");
Thread gateman = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
}
});
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == 1;
}
}));
gateman.start();
remote.stop();
local.stop();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == 1;
}
}));
assertMessageAreOrdered(remoteReceived);
}
@Test(timeout=60000)
public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
// In the async case the iterate method should see that we are stopping and
// drop out before we dispatch all the messages but it should get at least 49 since
// the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
doTestStopWhileStartingWithNoAsyncLimit(true, 49);
}
@Test(timeout=60000)
public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
// In the non-async case the start dispatches all messages up front and then continues on
doTestStopWhileStartingWithNoAsyncLimit(false, 100);
}
private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
remote.setAsync(async);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
local.start();
for(int i = 0; i < 100; ++i) {
local.oneway(new DummyCommand(i));
}
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
remote.stop();
} catch (Exception e) {
}
}
});
remote.start();
t.start();
assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() >= expect;
}
}));
LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
local.stop();
assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remote.isDisposed();
}
}));
}
@Test(timeout=120000)
public void TestTwoWayMessageThroughPutSync() throws Exception {
long totalTimes = 0;
final long executions = 20;
for (int i = 0; i < 20; ++i) {
totalTimes += doTestTwoWayMessageThroughPut(false);
}
LOG.info("Total time of one way sync send throughput test: " + (totalTimes/executions) + "ms");
}
@Test(timeout=120000)
public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
long totalTimes = 0;
final long executions = 50;
for (int i = 0; i < executions; ++i) {
totalTimes += doTestTwoWayMessageThroughPut(false);
}
LOG.info("Total time of one way async send throughput test: " + (totalTimes/executions) + "ms");
}
private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
final AtomicInteger sequenceId = new AtomicInteger();
remote.setAsync(async);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
final int messageCount = 200000;
local.start();
remote.start();
long startTime = System.currentTimeMillis();
Thread localSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
Thread remoteSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
localSend.start();
remoteSend.start();
// Wait for both to finish and then check that each side go the correct amount
localSend.join();
remoteSend.join();
long endTime = System.currentTimeMillis();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == messageCount;
}
}));
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return localReceived.size() == messageCount;
}
}));
LOG.debug("All messages sent,stop all");
local.stop();
remote.stop();
localReceived.clear();
remoteReceived.clear();
return endTime - startTime;
}
@Test(timeout=120000)
public void TestOneWayMessageThroughPutSync() throws Exception {
long totalTimes = 0;
final long executions = 30;
for (int i = 0; i < executions; ++i) {
totalTimes += doTestOneWayMessageThroughPut(false);
}
LOG.info("Total time of one way sync send throughput test: " + (totalTimes/executions) + "ms");
}
@Test(timeout=120000)
public void TestOneWayMessageThroughPutAsnyc() throws Exception {
long totalTimes = 0;
final long executions = 20;
for (int i = 0; i < 20; ++i) {
totalTimes += doTestOneWayMessageThroughPut(true);
}
LOG.info("Total time of one way async send throughput test: " + (totalTimes/executions) + "ms");
}
private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
final AtomicInteger sequenceId = new AtomicInteger();
remote.setAsync(async);
local.setPeer(remote);
remote.setPeer(local);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMTestTransportListener(remoteReceived));
final int messageCount = 100000;
local.start();
remote.start();
long startTime = System.currentTimeMillis();
Thread localSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
localSend.start();
// Wait for both to finish and then check that each side go the correct amount
localSend.join();
long endTime = System.currentTimeMillis();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == messageCount;
}
}));
LOG.debug("All messages sent,stop all");
local.stop();
remote.stop();
localReceived.clear();
remoteReceived.clear();
return endTime - startTime;
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, false);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(true, false);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, true);
}
}
@Test(timeout=120000)
public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestTwoWayTrafficWithMutexTransport(false, false);
}
}
public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
final VMTransport vmlocal = new VMTransport(new URI(location1));
final VMTransport vmremote = new VMTransport(new URI(location2));
final MutexTransport local = new MutexTransport(vmlocal);
final MutexTransport remote = new MutexTransport(vmremote);
final AtomicInteger sequenceId = new AtomicInteger();
vmlocal.setAsync(localAsync);
vmremote.setAsync(remoteAsync);
vmlocal.setPeer(vmremote);
vmremote.setPeer(vmlocal);
local.setTransportListener(new VMTestTransportListener(localReceived));
remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
final int messageCount = 200000;
Thread localSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
Thread remoteSend = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < messageCount; ++i) {
try {
remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
} catch (Exception e) {
}
}
}
});
localSend.start();
remoteSend.start();
Thread.sleep(10);
local.start();
remote.start();
// Wait for both to finish and then check that each side go the correct amount
localSend.join();
remoteSend.join();
assertTrue("Remote should have received ("+messageCount+") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteReceived.size() == messageCount;
}
}));
assertTrue("Local should have received ("+messageCount*2+") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return localReceived.size() == messageCount*2;
}
}));
LOG.debug("All messages sent,stop all");
local.stop();
remote.stop();
localReceived.clear();
remoteReceived.clear();
}
}