blob: c5435be8e457fbdb2a58b865190bfe77694aa4fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.support;
import java.net.URI;
import java.util.Map;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpTestSupport extends QpidJmsTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
private static final int PORT = Integer.getInteger("activemq.test.amqp.port", 0);
protected boolean isAmqpDiscovery() {
return false;
}
protected String getDiscoveryNetworkInterface() {
return null;
}
protected String getAmqpTransformer() {
return "jms";
}
protected int getSocketBufferSize() {
return 64 * 1024;
}
protected int getIOBufferSize() {
return 8 * 1024;
}
protected boolean isAddOpenWireConnector() {
return false;
}
protected boolean isFrameTracingEnabled() {
return false;
}
@Override
public URI getBrokerActiveMQClientConnectionURI() {
if (isAddOpenWireConnector()) {
try {
return brokerService.getTransportConnectorByName("openwire").getPublishableConnectURI();
} catch (Exception e) {
throw new RuntimeException();
}
} else {
return super.getBrokerActiveMQClientConnectionURI();
}
}
@Override
protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception {
int port = PORT;
if (portMap.containsKey("amqp")) {
port = portMap.get("amqp");
}
TransportConnector connector = brokerService.addConnector(
"amqp://127.0.0.1:" + port +
"?transport.transformer=" + getAmqpTransformer() +
"&transport.socketBufferSize=" + getSocketBufferSize() +
"&transport.tcpNoDelay=true" +
"&ioBufferSize=" + getIOBufferSize());
connector.setName("amqp");
if (isAmqpDiscovery()) {
String uriString = "multicast://default";
if(getDiscoveryNetworkInterface() != null) {
uriString += "?networkInterface=" + getDiscoveryNetworkInterface();
}
connector.setDiscoveryUri(new URI(uriString));
}
port = connector.getPublishableConnectURI().getPort();
LOG.debug("Using amqp port: {}", port);
if (isAddOpenWireConnector()) {
if (portMap.containsKey("openwire")) {
port = portMap.get("openwire");
} else {
port = 0;
}
connector = brokerService.addConnector("tcp://0.0.0.0:" + port);
connector.setName("openwire");
LOG.debug("Using openwire port: {}", port);
}
}
public String getAmqpConnectionURIOptions() {
return "";
}
public URI getBrokerAmqpConnectionURI() {
try {
String uri = "amqp://127.0.0.1:" +
brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?amqp.traceFrames=" + isFrameTracingEnabled() + "&" + getAmqpConnectionURIOptions();
} else {
uri = uri + "?amqp.traceFrames=" + isFrameTracingEnabled();
}
return new URI(uri);
} catch (Exception e) {
throw new RuntimeException();
}
}
public String getAmqpFailoverURI() throws Exception {
StringBuilder uri = new StringBuilder();
uri.append("failover://(");
uri.append(brokerService.getTransportConnectorByName("amqp").getPublishableConnectString());
for (BrokerService broker : brokers) {
uri.append(",");
uri.append(broker.getTransportConnectorByName("amqp").getPublishableConnectString());
}
uri.append(")");
return uri.toString();
}
public Connection createAmqpConnection() throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI());
}
public Connection createAmqpConnection(String username, String password) throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
}
public Connection createAmqpConnection(URI brokerURI) throws Exception {
return createAmqpConnection(brokerURI, null, null);
}
public Connection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
ConnectionFactory factory = createAmqpConnectionFactory(brokerURI, username, password);
return factory.createConnection();
}
public ConnectionFactory createAmqpConnectionFactory() throws Exception {
return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), null, null);
}
public ConnectionFactory createAmqpConnectionFactory(URI brokerURI) throws Exception {
return createAmqpConnectionFactory(brokerURI, null, null);
}
public ConnectionFactory createAmqpConnectionFactory(String username, String password) throws Exception {
return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), username, password);
}
public ConnectionFactory createAmqpConnectionFactory(URI brokerURI, String username, String password) throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI);
factory.setForceAsyncSend(isForceAsyncSends());
factory.setForceSyncSend(isForceSyncSends());
factory.setForceAsyncAcks(isForceAsyncAcks());
if (username != null) {
factory.setUsername(username);
}
if (password != null) {
factory.setPassword(password);
}
return factory;
}
public abstract class Testable implements Runnable {
protected String failure;
@Override
public String toString() {
return failure;
}
public synchronized boolean passed() {
return failure == null;
}
}
}