blob: 62fcb94de0bfa760a99c647ad8f473ad911d4af1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.net.URI;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class FailoverStaticNetworkTest {
protected static final Logger LOG = LoggerFactory.getLogger(FailoverStaticNetworkTest.class);
private final static String DESTINATION_NAME = "testQ";
protected BrokerService brokerA;
protected BrokerService brokerA1;
protected BrokerService brokerB;
protected BrokerService brokerC;
private SslContext sslContext;
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
return createBroker(scheme, listenPort, networkToPorts, null);
}
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts,
HashMap<String, String> networkProps) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort);
broker.addConnector(scheme + "://localhost:" + listenPort);
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
builder.append(networkToPorts[0]);
for (int i=1;i<networkToPorts.length; i++) {
builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
}
// limit the reconnects in case of initial random connection to slave
// leaving randomize on verifies that this config is picked up
builder.append(")?maxReconnectAttempts=1)");
NetworkConnector nc = broker.addNetworkConnector(builder.toString());
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
}
}
return broker;
}
private BrokerService createBroker(String listenPort, String dataDir) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setBrokerName("Broker_Shared");
// lazy create transport connector on start completion
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:" + listenPort));
broker.addConnector(connector);
broker.setDataDirectory(dataDir);
return broker;
}
@Before
public void setUp() throws Exception {
KeyManager[] km = SslBrokerServiceTest.getKeyManager();
TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
sslContext = new SslContext(km, tm, null);
}
@After
public void tearDown() throws Exception {
brokerB.stop();
brokerB.waitUntilStopped();
brokerA.stop();
brokerA.waitUntilStopped();
if (brokerA1 != null) {
brokerA1.stop();
brokerA1.waitUntilStopped();
}
if (brokerC != null) {
brokerC.stop();
brokerC.waitUntilStopped();
}
}
@Test
public void testSendReceiveAfterReconnect() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617"});
brokerB.start();
doTestNetworkSendReceive();
LOG.info("stopping brokerA");
brokerA.stop();
brokerA.waitUntilStopped();
LOG.info("restarting brokerA");
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
doTestNetworkSendReceive();
}
@Test
public void testSendReceiveFailover() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
brokerB.start();
doTestNetworkSendReceive();
LOG.info("stopping brokerA");
brokerA.stop();
brokerA.waitUntilStopped();
LOG.info("restarting brokerA");
brokerA = createBroker("tcp", "63617", null);
brokerA.start();
doTestNetworkSendReceive();
}
@Test
public void testSendReceiveFailoverDuplex() throws Exception {
final Vector<Throwable> errors = new Vector<Throwable>();
final String dataDir = "target/data/shared";
brokerA = createBroker("61617", dataDir);
brokerA.start();
final BrokerService slave = createBroker("63617", dataDir);
brokerA1 = slave;
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
public void run() {
try {
slave.start();
} catch (Exception e) {
e.printStackTrace();
errors.add(e);
}
}
});
executor.shutdown();
HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
networkConnectorProps.put("duplex", "true");
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerB.start();
doTestNetworkSendReceive(brokerA, brokerB);
doTestNetworkSendReceive(brokerB, brokerA);
LOG.info("stopping brokerA (master shared_broker)");
brokerA.stop();
brokerA.waitUntilStopped();
// wait for slave to start
brokerA1.waitUntilStarted();
doTestNetworkSendReceive(brokerA1, brokerB);
doTestNetworkSendReceive(brokerB, brokerA1);
assertTrue("No unexpected exceptions " + errors, errors.isEmpty());
}
@Test
// master slave piggy in the middle setup
public void testSendReceiveFailoverDuplexWithPIM() throws Exception {
final String dataDir = "target/data/shared/pim";
brokerA = createBroker("61617", dataDir);
brokerA.start();
final BrokerService slave = createBroker("63617", dataDir);
brokerA1 = slave;
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
public void run() {
try {
slave.start();
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
networkConnectorProps.put("duplex", "true");
networkConnectorProps.put("networkTTL", "2");
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerB.start();
assertTrue("all props applied", networkConnectorProps.isEmpty());
networkConnectorProps.put("duplex", "true");
networkConnectorProps.put("networkTTL", "2");
brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerC.start();
assertTrue("all props applied a second time", networkConnectorProps.isEmpty());
//Thread.sleep(4000);
doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC);
LOG.info("stopping brokerA (master shared_broker)");
brokerA.stop();
brokerA.waitUntilStopped();
doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC);
brokerC.stop();
brokerC.waitUntilStopped();
}
/**
* networked broker started after target so first connect attempt succeeds
* start order is important
*/
@Test
public void testSendReceive() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
brokerB.start();
doTestNetworkSendReceive();
}
@Test
public void testSendReceiveSsl() throws Exception {
brokerA = createBroker("ssl", "61617", null);
brokerA.start();
brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
brokerB.start();
doTestNetworkSendReceive();
}
private void doTestNetworkSendReceive() throws Exception, JMSException {
doTestNetworkSendReceive(brokerB, brokerA);
}
private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ..." + from);
SslContext.setCurrentSslContext(sslContext);
// Create a consumer on brokerA
ConnectionFactory consFactory = createConnectionFactory(from);
Connection consConn = consFactory.createConnection();
consConn.start();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
final MessageConsumer consumer = consSession.createConsumer(destination);
LOG.info("publishing to " + to);
sendMessageTo(destination, to);
boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumer.receive(1000) != null;
}
});
try {
consConn.close();
} catch (JMSException ignored) {
}
assertTrue("consumer on A got message", gotMessage);
}
private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setDispatchAsync(false);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
}