blob: 9196311a58cd4c3a08f21531ddbebde9b9ee9914 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertTrue;
// https://issues.apache.org/jira/browse/AMQ-6640
public class DuplexAdvisoryRaceTest {
private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class);
private static String hostName;
final AtomicLong responseReceived = new AtomicLong(0);
BrokerService brokerA,brokerB;
String networkConnectorUrlString;
@BeforeClass
public static void initIp() throws Exception {
// attempt to bypass loopback - not vital but it helps to reproduce
hostName = InetAddress.getLocalHost().getHostAddress();
}
@Before
public void createBrokers() throws Exception {
networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort();
brokerA = newBroker("A");
brokerB = newBroker("B");
responseReceived.set(0);
}
@After
public void stopBrokers() throws Exception {
brokerA.stop();
brokerB.stop();
}
// to be sure to be sure
public void repeatTestHang() throws Exception {
for (int i=0; i<10;i++) {
testHang();
stopBrokers();
createBrokers();
}
}
@Test
public void testHang() throws Exception {
brokerA.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription subscription = super.addConsumer(context, info);
// delay return to allow dispatch to interleave
if (context.isNetworkConnection()) {
TimeUnit.MILLISECONDS.sleep(100);
}
return subscription;
};
}});
// bridge
NetworkConnector networkConnector = bridgeBrokers(brokerA, brokerB);
brokerA.start();
brokerB.start();
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+ "?jms.watchTopicAdvisories=false");
ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+ "?jms.watchTopicAdvisories=false");
// populate dests
final int numDests = 400;
final int numMessagesPerDest = 50;
final int numConsumersPerDest = 5;
populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
// demand
List<Connection> connections = new LinkedList<>();
connections.add(demand(brokerBFactory, 0, numDests/2, numConsumersPerDest));
connections.add(demand(brokerAFactory, numDests/2, numDests, numConsumersPerDest));
LOG.info("Allow duplex bridge to connect....");
// allow bridge to start
brokerB.startTransportConnector(brokerB.addConnector(networkConnectorUrlString + "?transport.socketBufferSize=1024"));
if (!Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("received: " + responseReceived.get());
return responseReceived.get() >= numMessagesPerDest * numDests;
}
}, 30*60*1000)) {
org.apache.activemq.TestSupport.dumpAllThreads("DD");
// when hung close will also hang!
for (NetworkBridge networkBridge : networkConnector.activeBridges()) {
if (networkBridge instanceof DemandForwardingBridge) {
DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge) networkBridge;
Socket socket = demandForwardingBridge.getRemoteBroker().narrow(Socket.class);
socket.close();
}
}
}
networkConnector.stop();
for (Connection connection: connections) {
try {
connection.close();
} catch (Exception ignored) {}
}
assertTrue("received all sent: " + responseReceived.get(), responseReceived.get() >= numMessagesPerDest * numDests);
}
private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final BytesMessage message = session.createBytesMessage();
MessageProducer producer = session.createProducer(null);;
for (int i=minDest; i<maxDest; i++) {
Destination destination = qFromInt(i);
for (int j=0; j<numMessages; j++) {
producer.send(destination, message);
}
}
connection.close();
}
private Connection demand(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numConsumers) throws Exception {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i=minDest; i<maxDest; i++) {
Destination destination = qFromInt(i);
for (int j=0; j<numConsumers; j++) {
session.createConsumer(destination).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
responseReceived.incrementAndGet();
}
});
}
}
connection.start();
return connection;
}
private Destination qFromInt(int val) {
StringBuilder builder = new StringBuilder();
String digits = String.format("%03d", val);
for (int i=0; i<3; i++) {
builder.append(digits.charAt(i));
if (i < 2) {
builder.append('.');
}
}
return new ActiveMQQueue("Test." + builder.toString());
}
private BrokerService newBroker(String name) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.setBrokerName(name);
brokerService.addConnector("tcp://" + hostName + ":0?transport.socketBufferSize=1024");
PolicyMap map = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(0);
map.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(map);
return brokerService;
}
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());
connector.setDuplex(true);
localBroker.addNetworkConnector(connector);
return connector;
}
}