blob: 52856834474b0d9b0c1da85a0b723536af1d8907 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Arrays;
import java.util.Enumeration;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(QueueSubscription.class);
protected static final int MESSAGE_COUNT = 10;
public void testBrowse() throws Exception {
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
bridgeBrokers("BrokerA", "BrokerB");
startAllBrokers();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
Thread.sleep(1000);
int browsed = browseMessages("BrokerB", dest);
Thread.sleep(1000);
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
Thread.sleep(1000);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
LOG.info("A+B: " + msgsA.getMessageCount() + "+"
+ msgsB.getMessageCount());
assertEquals("Consumer on Broker A, should've consumed all messages", MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals("Broker B shouldn't get any messages", 0, browsed);
}
public void testConsumerInfo() throws Exception {
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml"));
startAllBrokers();
brokers.get("broker1").broker.waitUntilStarted();
Destination dest = createDestination("QUEUE.A,QUEUE.B", false);
int broker1 = browseMessages("broker1", dest);
assertEquals("Browsed a message on an empty queue", 0, broker1);
Thread.sleep(1000);
int broker2 = browseMessages("broker2", dest);
assertEquals("Browsed a message on an empty queue", 0, broker2);
}
public class Browser extends Thread {
String broker;
Destination dest;
int totalCount;
QueueBrowser browser = null;
MessageConsumer consumer = null;
boolean consume = false;
public Browser(String broker, Destination dest) {
this.broker = broker;
this.dest = dest;
}
public void run() {
int retries = 0;
while (retries++ < 5) {
try {
QueueBrowser browser = createBrowser(broker, dest);
int count = browseMessages(browser, broker);
if (consume) {
if (count != 0) {
MessageConsumer consumer = createSyncConsumer(broker, dest);
totalCount += count;
for (int i = 0; i < count; i++) {
ActiveMQTextMessage message = (ActiveMQTextMessage)consumer.receive(1000);
LOG.info(broker + " consumer: " + message.getText() + " " + message.getDestination() + " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath()));
if (message == null) break;
}
}
} else {
totalCount = count;
}
LOG.info("browser '" + broker + "' browsed " + totalCount);
Thread.sleep(1000);
} catch (Exception e) {
LOG.info("Exception browsing " + e, e);
} finally {
try {
if (browser != null) {
browser.close();
}
if (consumer != null) {
consumer.close();
}
} catch (Exception e) {
LOG.info("Exception closing browser " + e, e);
}
}
}
}
public int getTotalCount() {
return totalCount;
}
}
protected NetworkConnector bridgeBrokersWithIncludedDestination(String localBrokerName, String remoteBrokerName, ActiveMQDestination included, ActiveMQDestination excluded) throws Exception {
NetworkConnector nc = bridgeBrokers(localBrokerName, remoteBrokerName, false, 4, true);
nc.addStaticallyIncludedDestination(included);
if (excluded != null) {
nc.addExcludedDestination(excluded);
}
nc.setPrefetchSize(1);
return nc;
}
public void testAMQ3020() throws Exception {
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker1A.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker1B.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker2A.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker2B.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker3A.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker3B.xml"));
brokers.get("broker-1A").broker.waitUntilStarted();
brokers.get("broker-2A").broker.waitUntilStarted();
brokers.get("broker-3A").broker.waitUntilStarted();
Destination composite = createDestination("PROD.FUSESOURCE.3.A,PROD.FUSESOURCE.3.B", false);
Browser browser1 = new Browser("broker-3A", composite);
browser1.start();
Thread.sleep(1000);
Browser browser2 = new Browser("broker-3B", composite);
browser2.start();
Thread.sleep(1000);
sendMessages("broker-1A", composite, MESSAGE_COUNT);
browser1.join();
browser2.join();
LOG.info("broker-3A browsed " + browser1.getTotalCount());
LOG.info("broker-3B browsed " + browser2.getTotalCount());
assertEquals(MESSAGE_COUNT * 2, browser1.getTotalCount() + browser2.getTotalCount() );
}
protected int browseMessages(QueueBrowser browser, String name) throws Exception {
Enumeration msgs = browser.getEnumeration();
int browsedMessage = 0;
while (msgs.hasMoreElements()) {
browsedMessage++;
ActiveMQTextMessage message = (ActiveMQTextMessage)msgs.nextElement();
LOG.info(name + " browsed: " + message.getText() + " " + message.getDestination() + " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath()));
}
return browsedMessage;
}
protected int browseMessages(String broker, Destination dest) throws Exception {
QueueBrowser browser = createBrowser(broker, dest);
int browsedMessage = browseMessages(browser, "browser");
browser.close();
return browsedMessage;
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
}
}