blob: af4bbe6595ed60984f3dc7af9febf5dd7f72c7b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.utils;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.ConnectionURL;
public class QpidJmsClient0xProvider implements JmsProvider
{
private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
static
{
String initialContext = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
if (initialContext == null || initialContext.length() == 0)
{
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT);
}
}
private final Hashtable<Object, Object> _initialContextEnvironment = new Hashtable<>();
private final AmqpManagementFacade _managementFacade;
public QpidJmsClient0xProvider(AmqpManagementFacade managementFacade)
{
_managementFacade = managementFacade;
}
@Override
public ConnectionFactory getConnectionFactory() throws NamingException
{
if (Boolean.getBoolean(QpidBrokerTestCase.PROFILE_USE_SSL))
{
return getConnectionFactory("default.ssl");
}
else
{
return getConnectionFactory("default");
}
}
@Override
public ConnectionFactory getConnectionFactory(final Map<String, String> options) throws NamingException
{
throw new UnsupportedOperationException();
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName)
throws NamingException
{
return getConnectionFactory(factoryName, "test", "clientid");
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName, String vhost, String clientId)
throws NamingException
{
return getConnectionFactory(factoryName, vhost, clientId, Collections.<String, String>emptyMap());
}
@Override
public ConnectionFactory getConnectionFactory(String factoryName,
String vhost,
String clientId,
Map<String, String> options)
throws NamingException
{
return (ConnectionFactory) new InitialContext(_initialContextEnvironment).lookup(factoryName);
}
@Override
public Connection getConnection() throws JMSException, NamingException
{
return getConnection(QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnection(String username, String password) throws JMSException, NamingException
{
Connection con = getConnectionFactory().createConnection(username, password);
return con;
}
@Override
public Connection getClientConnection(String username, String password, String id)
throws Exception
{
Connection con = ((AMQConnectionFactory) getConnectionFactory()).createConnection(username,
password,
id);
return con;
}
@Override
public Connection getConnectionWithPrefetch(int prefetch) throws Exception
{
return getConnectionWithOptions(Collections.singletonMap("maxprefetch", String.valueOf(prefetch)));
}
@Override
public Connection getConnectionWithOptions(Map<String, String> options) throws Exception
{
return getConnectionWithOptions("test", options);
}
@Override
public Connection getConnectionWithOptions(String vhost, Map<String, String> options) throws Exception
{
ConnectionURL curl =
new AMQConnectionURL(((AMQConnectionFactory) getConnectionFactory()).getConnectionURLString());
for (Map.Entry<String, String> entry : options.entrySet())
{
curl.setOption(entry.getKey(), entry.getValue());
}
curl = new AMQConnectionURL(curl.toString());
curl.setUsername(QpidBrokerTestCase.GUEST_USERNAME);
curl.setPassword(QpidBrokerTestCase.GUEST_PASSWORD);
curl.setVirtualHost(vhost);
Connection connection = new AMQConnectionFactory(curl).createConnection(curl.getUsername(), curl.getPassword());
return connection;
}
@Override
public Connection getConnectionForVHost(String vhost)
throws Exception
{
return getConnectionForVHost(vhost, QpidBrokerTestCase.GUEST_USERNAME, QpidBrokerTestCase.GUEST_PASSWORD);
}
@Override
public Connection getConnectionForVHost(String vhost, String username, String password)
throws Exception
{
ConnectionURL curl =
new AMQConnectionURL(((AMQConnectionFactory) getConnectionFactory()).getConnectionURLString());
curl.setVirtualHost("/" + vhost);
curl = new AMQConnectionURL(curl.toString());
curl.setUsername(username);
curl.setPassword(password);
Connection connection =
new AMQConnectionFactory(curl).createConnection(curl.getUsername(), curl.getPassword());
return connection;
}
@Override
public Connection getConnection(String urlString) throws Exception
{
ConnectionURL url = new AMQConnectionURL(urlString);
Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword());
return connection;
}
@Override
public Connection getConnectionWithSyncPublishing() throws Exception
{
Map<String, String> options = new HashMap<>();
options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all");
return getConnectionWithOptions(options);
}
@Override
public Queue getTestQueue(final String testQueueName)
{
return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, testQueueName);
}
@Override
public Queue getQueueFromName(Session session, String name) throws JMSException
{
return new AMQQueue("", name);
}
@Override
public Queue createTestQueue(Session session, String queueName) throws JMSException
{
Queue amqQueue = getTestQueue(queueName);
session.createConsumer(amqQueue).close();
return amqQueue;
}
@Override
public Topic getTestTopic(final String testQueueName)
{
return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, testQueueName);
}
@Override
public Topic createTopic(final Connection con, final String topicName) throws JMSException
{
return getTestTopic(topicName);
}
@Override
public Topic createTopicOnDirect(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return new AMQTopic(
"direct://amq.direct/"
+ topicName
+ "/"
+ topicName
+ "?routingkey='"
+ topicName
+ "',exclusive='true',autodelete='true'");
}
@Override
public Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return new AMQTopic(
"fanout://amq.fanout/"
+ topicName
+ "/"
+ topicName
+ "?routingkey='"
+ topicName
+ "',exclusive='true',autodelete='true'");
}
@Override
public long getQueueDepth(final Connection con, final Queue destination) throws Exception
{
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
return ((AMQSession<?, ?>) session).getQueueDepth((AMQDestination) destination);
}
finally
{
session.close();
}
}
@Override
public boolean isQueueExist(final Connection con, final Queue destination) throws Exception
{
Queue queue = new AMQQueue("", destination.getQueueName());
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
return ((AMQSession<?, ?>) session).isQueueBound((AMQDestination) queue);
}
finally
{
session.close();
}
}
public String getBrokerDetailsFromDefaultConnectionUrl()
{
try
{
AMQConnectionFactory factory = (AMQConnectionFactory) getConnectionFactory();
ConnectionURL connectionURL = factory.getConnectionURL();
if (connectionURL.getBrokerCount() > 0)
{
return connectionURL
.getBrokerDetails(0)
.toString();
}
else
{
throw new RuntimeException("No broker details are available.");
}
}
catch (NamingException e)
{
throw new RuntimeException("No broker details are available.", e);
}
}
}