blob: 5e02cd82d18425f57b32d9a45d7075dbae084074 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.jndi.JNDIStorable;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderFactory;
import org.apache.qpid.jms.util.IdGenerator;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.URISupport;
import org.apache.qpid.jms.util.URISupport.CompositeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JMS ConnectionFactory Implementation.
*/
public class JmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionFactory.class);
private URI brokerURI;
private URI localURI;
private String username;
private String password;
private boolean forceAsyncSend;
private boolean alwaysSyncSend;
private boolean sendAcksAsync;
private boolean omitHost;
private boolean messagePrioritySupported = true;
private String queuePrefix = null;
private String topicPrefix = null;
private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
private boolean watchRemoteDestinations = true;
private IdGenerator clientIdGenerator;
private String clientIDPrefix;
private IdGenerator connectionIdGenerator;
private String connectionIDPrefix;
private ExceptionListener exceptionListener;
private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
public JmsConnectionFactory() {
}
public JmsConnectionFactory(String username, String password) {
setUsername(username);
setPassword(password);
}
public JmsConnectionFactory(String brokerURI) {
this(createURI(brokerURI));
}
public JmsConnectionFactory(URI brokerURI) {
setBrokerURI(brokerURI.toString());
}
public JmsConnectionFactory(String userName, String password, URI brokerURI) {
setUsername(userName);
setPassword(password);
setBrokerURI(brokerURI.toString());
}
public JmsConnectionFactory(String userName, String password, String brokerURI) {
setUsername(userName);
setPassword(password);
setBrokerURI(brokerURI);
}
/**
* Set properties
*
* @param props
*/
public void setProperties(Properties props) {
Map<String, String> map = new HashMap<String, String>();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
map.put(entry.getKey().toString(), entry.getValue().toString());
}
setProperties(map);
}
@Override
public void setProperties(Map<String, String> map) {
buildFromProperties(map);
}
/**
* @param map
*/
@Override
protected void buildFromProperties(Map<String, String> map) {
PropertyUtil.setProperties(this, map);
}
/**
* @param map
*/
@Override
protected void populateProperties(Map<String, String> map) {
try {
Map<String, String> result = PropertyUtil.getProperties(this);
map.putAll(result);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @return a TopicConnection
* @throws JMSException
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
@Override
public TopicConnection createTopicConnection() throws JMSException {
return createTopicConnection(getUsername(), getPassword());
}
/**
* @param userName
* @param password
* @return a TopicConnection
* @throws JMSException
* @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String,
* java.lang.String)
*/
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
try {
String connectionId = getConnectionIdGenerator().generateId();
Provider provider = createProvider(brokerURI);
JmsTopicConnection result = new JmsTopicConnection(connectionId, provider, getClientIdGenerator());
return configureConnection(result, username, password);
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
}
/**
* @return a Connection
* @throws JMSException
* @see javax.jms.ConnectionFactory#createConnection()
*/
@Override
public Connection createConnection() throws JMSException {
return createConnection(getUsername(), getPassword());
}
/**
* @param userName
* @param password
* @return Connection
* @throws JMSException
* @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
*/
@Override
public Connection createConnection(String username, String password) throws JMSException {
try {
String connectionId = getConnectionIdGenerator().generateId();
Provider provider = createProvider(brokerURI);
JmsConnection result = new JmsConnection(connectionId, provider, getClientIdGenerator());
return configureConnection(result, username, password);
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
}
/**
* @return a QueueConnection
* @throws JMSException
* @see javax.jms.QueueConnectionFactory#createQueueConnection()
*/
@Override
public QueueConnection createQueueConnection() throws JMSException {
return createQueueConnection(getUsername(), getPassword());
}
/**
* @param userName
* @param password
* @return a QueueConnection
* @throws JMSException
* @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String,
* java.lang.String)
*/
@Override
public QueueConnection createQueueConnection(String username, String password) throws JMSException {
try {
String connectionId = getConnectionIdGenerator().generateId();
Provider provider = createProvider(brokerURI);
JmsQueueConnection result = new JmsQueueConnection(connectionId, provider, getClientIdGenerator());
return configureConnection(result, username, password);
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
}
protected <T extends JmsConnection> T configureConnection(T connection, String username, String password) throws JMSException {
try {
PropertyUtil.setProperties(connection, PropertyUtil.getProperties(this));
connection.setExceptionListener(exceptionListener);
connection.setUsername(username);
connection.setPassword(password);
connection.setBrokerURI(brokerURI);
return connection;
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
}
protected Provider createProvider(URI brokerURI) throws Exception {
Provider result = null;
try {
result = ProviderFactory.create(brokerURI);
} catch (Exception ex) {
LOG.error("Failed to create JMS Provider instance for: {}", brokerURI.getScheme());
LOG.trace("Error: ", ex);
throw ex;
}
return result;
}
protected static URI createURI(String name) {
if (name != null && name.trim().isEmpty() == false) {
try {
return new URI(name);
} catch (URISyntaxException e) {
throw (IllegalArgumentException) new IllegalArgumentException("Invalid broker URI: " + name).initCause(e);
}
}
return null;
}
protected synchronized IdGenerator getConnectionIdGenerator() {
if (connectionIdGenerator == null) {
if (connectionIDPrefix != null) {
connectionIdGenerator = new IdGenerator(connectionIDPrefix);
} else {
connectionIdGenerator = new IdGenerator();
}
}
return connectionIdGenerator;
}
protected synchronized void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
this.connectionIdGenerator = connectionIdGenerator;
}
//////////////////////////////////////////////////////////////////////////
// Property getters and setters
//////////////////////////////////////////////////////////////////////////
/**
* @return the brokerURI
*/
public String getBrokerURI() {
return this.brokerURI != null ? this.brokerURI.toString() : "";
}
/**
* @param brokerURI
* the brokerURI to set
*/
public void setBrokerURI(String brokerURI) {
if (brokerURI == null) {
throw new IllegalArgumentException("brokerURI cannot be null");
}
this.brokerURI = createURI(brokerURI);
try {
if (this.brokerURI.getQuery() != null) {
Map<String, String> map = PropertyUtil.parseQuery(this.brokerURI.getQuery());
Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms.");
if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
String msg = ""
+ " Not all jms options could be set on the ConnectionFactory."
+ " Check the options are spelled correctly."
+ " Given parameters=[" + jmsOptionsMap + "]."
+ " This connection factory cannot be started.";
throw new IllegalArgumentException(msg);
} else {
this.brokerURI = PropertyUtil.replaceQuery(this.brokerURI, map);
}
} else if (URISupport.isCompositeURI(this.brokerURI)) {
CompositeData data = URISupport.parseComposite(this.brokerURI);
Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms.");
if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
String msg = ""
+ " Not all jms options could be set on the ConnectionFactory."
+ " Check the options are spelled correctly."
+ " Given parameters=[" + jmsOptionsMap + "]."
+ " This connection factory cannot be started.";
throw new IllegalArgumentException(msg);
} else {
this.brokerURI = data.toURI();
}
}
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage());
}
}
/**
* @return the localURI
*/
public String getLocalURI() {
return this.localURI != null ? this.localURI.toString() : "";
}
/**
* @param localURI
* the localURI to set
*/
public void setLocalURI(String localURI) {
this.localURI = createURI(localURI);
}
/**
* @return the username
*/
public String getUsername() {
return this.username;
}
/**
* @param username
* the username to set
*/
public void setUsername(String username) {
this.username = username;
}
/**
* @return the password
*/
public String getPassword() {
return this.password;
}
/**
* @param password
* the password to set
*/
public void setPassword(String password) {
this.password = password;
}
public boolean isForceAsyncSend() {
return forceAsyncSend;
}
public void setForceAsyncSend(boolean forceAsyncSend) {
this.forceAsyncSend = forceAsyncSend;
}
public boolean isOmitHost() {
return omitHost;
}
public void setOmitHost(boolean omitHost) {
this.omitHost = omitHost;
}
/**
* @return the messagePrioritySupported configuration option.
*/
public boolean isMessagePrioritySupported() {
return this.messagePrioritySupported;
}
/**
* Enables message priority support in MessageConsumer instances. This results
* in all prefetched messages being dispatched in priority order.
*
* @param messagePrioritySupported the messagePrioritySupported to set
*/
public void setMessagePrioritySupported(boolean messagePrioritySupported) {
this.messagePrioritySupported = messagePrioritySupported;
}
/**
* Returns the prefix applied to Queues that are created by the client.
*
* @return the currently configured Queue prefix.
*/
public String getQueuePrefix() {
return queuePrefix;
}
public void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
/**
* Returns the prefix applied to Topics that are created by the client.
*
* @return the currently configured Topic prefix.
*/
public String getTopicPrefix() {
return topicPrefix;
}
public void setTopicPrefix(String topicPrefix) {
this.topicPrefix = topicPrefix;
}
/**
* Gets the currently set close timeout.
*
* @return the currently set close timeout.
*/
public long getCloseTimeout() {
return closeTimeout;
}
/**
* Sets the close timeout used to control how long a Connection close will wait for
* clean shutdown of the connection before giving up. A negative value means wait
* forever.
*
* Care should be taken in that a very short close timeout can cause the client to
* not cleanly shutdown the connection and it's resources.
*
* @param closeTimeout
* time in milliseconds to wait for a clean connection close.
*/
public void setCloseTimeout(long closeTimeout) {
this.closeTimeout = closeTimeout;
}
/**
* Returns the currently configured wire level connect timeout.
*
* @return the currently configured wire level connect timeout.
*/
public long getConnectTimeout() {
return this.connectTimeout;
}
/**
* Sets the timeout value used to control how long a client will wait for a successful
* connection to the remote peer to be established before considering the attempt to
* have failed. This value does not control socket level connection timeout but rather
* connection handshake at the wire level, to control the socket level timeouts use the
* standard socket options configuration values.
*
* @param connectTimeout
* the time in milliseconds to wait for the protocol connection handshake to complete.
*/
public void setConnectTimeout(long connectTimeout) {
this.connectTimeout = connectTimeout;
}
public long getSendTimeout() {
return sendTimeout;
}
public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}
public long getRequestTimeout() {
return requestTimeout;
}
public void setRequestTimeout(long requestTimeout) {
this.requestTimeout = requestTimeout;
}
public JmsPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
public synchronized String getClientIDPrefix() {
return clientIDPrefix;
}
/**
* Sets the prefix used by auto-generated JMS Client ID values which are used if the JMS
* client does not explicitly specify on.
*
* @param clientIDPrefix
*/
public synchronized void setClientIDPrefix(String clientIDPrefix) {
this.clientIDPrefix = clientIDPrefix;
}
protected synchronized IdGenerator getClientIdGenerator() {
if (clientIdGenerator == null) {
if (clientIDPrefix != null) {
clientIdGenerator = new IdGenerator(clientIDPrefix);
} else {
clientIdGenerator = new IdGenerator();
}
}
return clientIdGenerator;
}
protected synchronized void setClientIdGenerator(IdGenerator clientIdGenerator) {
this.clientIdGenerator = clientIdGenerator;
}
/**
* Sets the prefix used by connection id generator.
*
* @param connectionIDPrefix
* The string prefix used on all connection Id's created by this factory.
*/
public synchronized void setConnectionIDPrefix(String connectionIDPrefix) {
this.connectionIDPrefix = connectionIDPrefix;
}
/**
* Gets the currently configured JMS ExceptionListener that will be set on all
* new Connection objects created from this factory.
*
* @return the currently configured JMS ExceptionListener.
*/
public ExceptionListener getExceptionListener() {
return exceptionListener;
}
/**
* Sets the JMS ExceptionListener that will be set on all new Connection objects
* created from this factory.
*
* @param exceptionListener
* the JMS ExceptionListenenr to apply to new Connection's or null to clear.
*/
public void setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
/**
* Indicates if the Connection's created from this factory will watch for updates
* from the remote peer informing of temporary destination creation and destruction.
*
* @return true if destination monitoring is enabled.
*/
public boolean isWatchRemoteDestinations() {
return watchRemoteDestinations;
}
/**
* Enable or disable monitoring of remote temporary destination life-cycles.
*
* @param watchRemoteDestinations
* true if connection instances should monitor remote destination life-cycles.
*/
public void setWatchRemoteDestinations(boolean watchRemoteDestinations) {
this.watchRemoteDestinations = watchRemoteDestinations;
}
/**
* Returns true if the client should always send messages using a synchronous
* send operation regardless of persistence mode, or inside a transaction.
*
* @return true if sends should always be done synchronously.
*/
public boolean isAlwaysSyncSend() {
return alwaysSyncSend;
}
/**
* Configures whether or not the client will always send messages synchronously or not
* regardless of other factors that might result in an asynchronous send.
*
* @param alwaysSyncSend
* if true sends are always done synchronously.
*/
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
/**
* @return true if consumer acknowledgments are sent asynchronously or not.
*/
public boolean isSendAcksAsync() {
return sendAcksAsync;
}
/**
* Should the message acknowledgments from a consumer be sent synchronously or
* asynchronously. Sending the acknowledgments asynchronously can increase the
* performance of a consumer but opens up the possibility of a missed message
* acknowledge should the connection be unstable.
*
* @param sendAcksAsync
* true to have the client send all message acknowledgments asynchronously.
*/
public void setSendAcksAsync(boolean sendAcksAsync) {
this.sendAcksAsync = sendAcksAsync;
}
}