blob: 0b8de2905a08f0923d5ad3dbe7c1884c92c823db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(BrokerQueueNetworkWithDisconnectTest.class);
private static final int NETWORK_DOWN_TIME = 5000;
protected static final int MESSAGE_COUNT = 200;
private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker";
private SocketProxy socketProxy;
private long networkDownTimeStart;
public boolean useDuplexNetworkBridge = true;
public boolean simulateStalledNetwork;
private long inactiveDuration = 1000;
private boolean useSocketProxy = true;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} );
addCombinationValues( "simulateStalledNetwork", new Object[]{ Boolean.TRUE } );
}
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
bridgeBrokers(SPOKE, HUB);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer client = createConsumer(HUB, dest);
// allow subscription information to flow back to Spoke
sleep(600);
// Send messages
sendMessages(SPOKE, dest, MESSAGE_COUNT);
MessageIdList msgs = getConsumerMessages(HUB, client);
msgs.waitForMessagesToArrive(MESSAGE_COUNT);
assertTrue("At least message " + MESSAGE_COUNT +
" must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
MESSAGE_COUNT <= msgs.getMessageCount());
}
@SuppressWarnings("unchecked")
public void testNoStuckConnectionsWithTransportDisconnect() throws Exception {
inactiveDuration=60000l;
useDuplexNetworkBridge = true;
bridgeBrokers(SPOKE, HUB);
final BrokerItem hub = brokers.get(HUB);
hub.broker.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
int sleepCount = 2;
@Override
public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error)
throws Exception {
try {
while(--sleepCount >= 0) {
LOG.info("sleeping for a bit in close impl to simulate load where reconnect fails due to a pending close");
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception ignored) {}
super.removeConnection(context, info, error);
}
}
});
startAllBrokers();
waitForBridgeFormation();
// kill the initiator side, leaving remote end intact
// simulate async network breakage
// remote side will need to spot duplicate network and stop/kill the original
for (int i=0; i< 3; i++) {
socketProxy.halfClose();
sleep(10000);
}
// wait for full reformation of bridge
// verify no extra connections
boolean allGood = Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
long numConnections = hub.broker.getTransportConnectors().get(0).getConnections().size();
LOG.info("Num connetions:" + numConnections);
return numConnections == 1;
}});
if (!allGood) {
dumpAllThreads("ExtraHubConnection");
}
assertTrue("should be only one transport connection for the single duplex network connector", allGood);
allGood = Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
long numVmConnections = VMTransportFactory.SERVERS.get(HUB).getConnectionCount();
LOG.info("Num VM connetions:" + numVmConnections);
return numVmConnections == 2;
}});
if (!allGood) {
dumpAllThreads("ExtraHubVMConnection");
}
assertTrue("should be only 2 vm connections for the single network duplex network connector", allGood);
}
public void testTwoDuplexNCsAreAllowed() throws Exception {
useDuplexNetworkBridge = true;
useSocketProxy = false;
NetworkConnector connector = bridgeBrokers(SPOKE, HUB);
connector.setName("FirstDuplex");
connector = bridgeBrokers(SPOKE, HUB);
connector.setName("SecondDuplex");
startAllBrokers();
waitForBridgeFormation();
BrokerItem hub = brokers.get(HUB);
assertEquals("Has two transport Connectors", 2, hub.broker.getTransportConnectors().get(0).getConnections().size());
}
@Override
protected void startAllBrokers() throws Exception {
// Ensure HUB is started first so bridge will be active from the get go
BrokerItem brokerItem = brokers.get(HUB);
brokerItem.broker.start();
brokerItem = brokers.get(SPOKE);
brokerItem.broker.start();
sleep(600);
}
@Override
public void setUp() throws Exception {
networkDownTimeStart = 0;
inactiveDuration = 1000;
useSocketProxy = true;
super.setAutoFail(true);
super.setUp();
final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (socketProxy != null) {
socketProxy.close();
}
}
public static Test suite() {
return suite(BrokerQueueNetworkWithDisconnectTest.class);
}
@Override
protected void onSend(int i, TextMessage msg) {
sleep(50);
if (i == 50 || i == 150) {
if (simulateStalledNetwork) {
socketProxy.pause();
} else {
socketProxy.close();
}
networkDownTimeStart = System.currentTimeMillis();
} else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds
if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) {
if (simulateStalledNetwork) {
socketProxy.goOn();
} else {
socketProxy.reopen();
}
networkDownTimeStart = 0;
} else {
// slow message production to allow bridge to recover and limit message duplication
sleep(500);
}
}
super.onSend(i, msg);
}
private void sleep(int milliSecondTime) {
try {
Thread.sleep(milliSecondTime);
} catch (InterruptedException igonred) {
}
}
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = transportConnectors.get(0).getConnectUri();
if (useSocketProxy) {
socketProxy = new SocketProxy(remoteURI);
remoteURI = socketProxy.getUrl();
}
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + remoteURI
+ "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + inactiveDuration + ")?useExponentialBackOff=false"));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
if (useDuplexNetworkBridge) {
connector.setDuplex(true);
}
return connector;
} else {
throw new Exception("Remote broker has no registered connectors.");
}
}
}