blob: ae397a78003e77de0fccd64569b8a84b17dc21d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.utils;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QpidJmsClientProvider implements JmsProvider
{
private static final String CLIENTID = "clientid";
private final AmqpManagementFacade _managementFacade;
private final Hashtable<Object, Object> _initialContextEnvironment = new Hashtable<>();
public QpidJmsClientProvider(AmqpManagementFacade managementFacade)
{
_managementFacade = managementFacade;
}
@Override
public ConnectionFactory getConnectionFactory() throws NamingException
{
return getConnectionFactory(Collections.<String, String>emptyMap());
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName) throws NamingException
{
return getConnectionFactory(factoryName, Collections.<String, String>emptyMap());
}
@Override
public ConnectionFactory getConnectionFactory(Map<String, String> options) throws NamingException
{
if (Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL))
{
return getConnectionFactory("default.ssl", options);
}
else
{
return getConnectionFactory("default", options);
}
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId) throws NamingException
{
return getConnectionFactory(factoryName, vhost, clientId, Collections.<String, String>emptyMap());
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName,
String vhost,
String clientId,
Map<String, String> options)
throws NamingException
{
Map<String, String> actualOptions = new LinkedHashMap<>();
actualOptions.put("amqp.vhost", vhost);
actualOptions.put("jms.clientID", clientId);
actualOptions.putAll(options);
return getConnectionFactory(factoryName, actualOptions);
}
private ConnectionFactory getConnectionFactory(final String factoryName, Map<String, String> options)
throws NamingException
{
if (!options.containsKey("amqp.vhost"))
{
options = new HashMap<>(options);
options.put("amqp.vhost", "test");
}
if (!options.containsKey("jms.clientID"))
{
options = new HashMap<>(options);
options.put("jms.clientID", CLIENTID);
}
else if (options.get("jms.clientID") == null)
{
options.remove("jms.clientID");
}
if (!options.containsKey("amqp.forceSyncSend"))
{
options = new HashMap<>(options);
options.put("jms.forceSyncSend", "true");
}
if (!options.containsKey("amqp.populateJMSXUserID"))
{
options = new HashMap<>(options);
options.put("jms.populateJMSXUserID", "true");
}
if ("failover".equals(factoryName))
{
if (!options.containsKey("failover.maxReconnectAttempts"))
{
options.put("failover.maxReconnectAttempts", "2");
}
final StringBuilder stem = new StringBuilder("failover:(amqp://localhost:")
.append(System.getProperty("test.port"))
.append(",amqp://localhost:")
.append(System.getProperty("test.port.alt"))
.append(")");
appendOptions(options, stem);
_initialContextEnvironment.put("property.connectionfactory.failover.remoteURI",
stem.toString());
}
else if ("default".equals(factoryName))
{
final StringBuilder stem =
new StringBuilder("amqp://localhost:").append(System.getProperty("test.port"));
appendOptions(options, stem);
_initialContextEnvironment.put("property.connectionfactory.default.remoteURI", stem.toString());
}
else if ("default.ssl".equals(factoryName))
{
final StringBuilder stem = new StringBuilder("amqps://localhost:").append(String.valueOf(System.getProperty("test.port.ssl")));
appendOptions(options, stem);
_initialContextEnvironment.put("connectionfactory.default.ssl", stem.toString());
}
return (ConnectionFactory) new InitialContext(_initialContextEnvironment).lookup(factoryName);
}
@Override
public Connection getConnection() throws JMSException, NamingException
{
return getConnection(QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
Connection con = getConnectionFactory().createConnection(username, password);
return con;
}
@Override
public Connection getConnectionWithPrefetch(int prefetch) throws Exception
{
String factoryName = Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL) ? "default.ssl" : "default";
final Map<String, String> options =
Collections.singletonMap("jms.prefetchPolicy.all", String.valueOf(prefetch));
final ConnectionFactory connectionFactory = getConnectionFactory(factoryName, "test", "clientid", options);
return connectionFactory.createConnection(QpidBrokerTestCase.GUEST_USERNAME,
QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
{
return getConnectionWithOptions("test", options);
}
@Override
public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
{
return getConnectionFactory(Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL)
? "default.ssl"
: "default",
vhost,
"clientId",
options).createConnection(QpidBrokerTestCase.GUEST_USERNAME,
QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnectionForVHost(String vhost)
throws Exception
{
return getConnectionForVHost(vhost, QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnectionForVHost(String vhost, String username, String password)
throws Exception
{
return getConnectionFactory(Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL)
? "default.ssl"
: "default", vhost, "clientId").createConnection(username, password);
}
@Override
public Connection getConnection(String urlString) throws Exception
{
throw new UnsupportedOperationException();
}
@Override
public Queue getTestQueue(final String testQueueName)
{
Connection con = null;
try
{
con = getConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createQueue(testQueueName);
}
catch (JMSException | NamingException e)
{
throw new RuntimeException("Failed to create a test queue name : " + testQueueName, e);
}
finally
{
if (con != null)
{
try
{
con.close();
}
catch (JMSException e)
{
}
}
}
}
@Override
public Queue getQueueFromName(Session session, String name) throws JMSException
{
return session.createQueue(name);
}
@Override
public Queue createTestQueue(Session session, String queueName) throws JMSException
{
_managementFacade.createEntityUsingAmqpManagement(queueName, session, "org.apache.qpid.Queue");
return session.createQueue(queueName);
}
@Override
public Topic getTestTopic(final String testTopicName)
{
Connection con = null;
try
{
con = getConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createTopic(testTopicName);
}
catch (JMSException | NamingException e)
{
throw new RuntimeException("Failed to create a test topic name : " + testTopicName, e);
}
finally
{
if (con != null)
{
try
{
con.close();
}
catch (JMSException e)
{
}
}
}
}
@Override
public Topic createTopic(final Connection con, final String topicName) throws JMSException
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
_managementFacade.createEntityUsingAmqpManagement(topicName, session, "org.apache.qpid.TopicExchange");
return session.createTopic(topicName);
}
@Override
public Topic createTopicOnDirect(final Connection con, String topicName) throws JMSException, URISyntaxException
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createTopic("amq.direct/" + topicName);
}
@Override
public Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createTopic("amq.fanout/" + topicName);
}
@Override
public long getQueueDepth(final Connection con, final Queue destination) throws Exception
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
MessageProducer producer = session.createProducer(session.createQueue("$management"));
final TemporaryQueue responseQ = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(responseQ);
MapMessage message = session.createMapMessage();
message.setStringProperty("index", "object-path");
final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
message.setStringProperty("key", escapedName);
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "getStatistics");
message.setStringProperty("statistics", "[\"queueDepthMessages\"]");
message.setJMSReplyTo(responseQ);
producer.send(message);
Message response = consumer.receive();
try
{
if (response instanceof MapMessage)
{
return ((MapMessage) response).getLong("queueDepthMessages");
}
else if (response instanceof ObjectMessage)
{
Object body = ((ObjectMessage) response).getObject();
if (body instanceof Map)
{
return Long.valueOf(((Map) body).get("queueDepthMessages").toString());
}
else
{
throw new IllegalArgumentException("Cannot parse the results from a management operation."
+ " Unexpected message object type : " + body);
}
}
else
{
throw new IllegalArgumentException("Cannot parse the results from a management operation."
+ " Unexpected response message type : " + response.getClass());
}
}
finally
{
consumer.close();
responseQ.delete();
}
}
finally
{
session.close();
}
}
@Override
public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
MessageProducer producer = session.createProducer(session.createQueue("$management"));
final TemporaryQueue responseQ = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(responseQ);
MapMessage message = session.createMapMessage();
message.setStringProperty("index", "object-path");
final String escapedName = destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
message.setStringProperty("key", escapedName);
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "READ");
message.setJMSReplyTo(responseQ);
producer.send(message);
Message response = consumer.receive();
try
{
int statusCode = response.getIntProperty("statusCode");
switch(statusCode)
{
case 200:
return true;
case 404:
return false;
default:
throw new RuntimeException(String.format("Unexpected response for queue query '%s' : %d", destination.getQueueName(), statusCode));
}
}
finally
{
consumer.close();
responseQ.delete();
}
}
finally
{
session.close();
}
}
@Override
public Connection getConnectionWithSyncPublishing() throws Exception
{
return getConnection();
}
@Override
public Connection getClientConnection(String username, String password, String id)
throws Exception
{
return getConnectionFactory("default", "test", id).createConnection(username, password);
}
@Override
public String getBrokerDetailsFromDefaultConnectionUrl()
{
throw new UnsupportedOperationException();
}
private void appendOptions(final Map<String, String> actualOptions, final StringBuilder stem)
{
boolean first = true;
for(Map.Entry<String, String> option : actualOptions.entrySet())
{
if(first)
{
stem.append('?');
first = false;
}
else
{
stem.append('&');
}
try
{
stem.append(option.getKey()).append('=').append(URLEncoder.encode(option.getValue(), "UTF-8"));
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
}
}