blob: 01411bc5ea81519edcbaefabe0ea54700a50c3b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.karaf.jms.internal;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.pool.PooledConnection;
import org.apache.karaf.jms.JmsMessage;
import org.apache.karaf.jms.JmsService;
import org.apache.karaf.util.TemplateUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
import javax.jms.*;
import java.io.*;
import java.lang.IllegalStateException;
import java.util.*;
/**
* Default implementation of the JMS Service.
*/
public class JmsServiceImpl implements JmsService {
private BundleContext bundleContext;
private File deployFolder;
public JmsServiceImpl() {
File karafBase = new File(System.getProperty("karaf.base"));
deployFolder = new File(karafBase, "deploy");
}
@Override
public void create(String name, String type, String url) throws Exception {
create(name, type, url, null, null);
}
@Override
public void create(String name, String type, String url, String username, String password) throws Exception {
if (!type.equalsIgnoreCase("activemq") && !type.equalsIgnoreCase("webspheremq")) {
throw new IllegalArgumentException("JMS connection factory type not known");
}
File outFile = getConnectionFactoryFile(name);
String template;
HashMap<String, String> properties = new HashMap<String, String>();
properties.put("name", name);
properties.put("username", username);
properties.put("password", password);
if (type.equalsIgnoreCase("activemq")) {
// activemq
properties.put("url", url);
template = "connectionfactory-activemq.xml";
} else {
// webspheremq
String[] splitted = url.split("/");
if (splitted.length != 4) {
throw new IllegalStateException("WebsphereMQ URI should be in the following format: host/port/queuemanager/channel");
}
properties.put("host", splitted[0]);
properties.put("port", splitted[1]);
properties.put("queuemanager", splitted[2]);
properties.put("channel", splitted[3]);
template = "connectionfactory-webspheremq.xml";
}
InputStream is = this.getClass().getResourceAsStream(template);
if (is == null) {
throw new IllegalArgumentException("Template resource " + template + " doesn't exist");
}
TemplateUtils.createFromTemplate(outFile, is, properties);
}
private File getConnectionFactoryFile(String name) {
return new File(deployFolder, "connectionfactory-" + name + ".xml");
}
@Override
public void delete(String name) throws Exception {
File connectionFactoryFile = getConnectionFactoryFile(name);
if (!connectionFactoryFile.exists()) {
throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile.getPath() + " doesn't exist");
}
connectionFactoryFile.delete();
}
@SuppressWarnings("rawtypes")
@Override
public List<String> connectionFactories() throws Exception {
List<String> connectionFactories = new ArrayList<String>();
ServiceReference[] references = bundleContext.getServiceReferences(ConnectionFactory.class.getName(), null);
if (references != null) {
for (ServiceReference reference : references) {
if (reference.getProperty("osgi.jndi.service.name") != null) {
connectionFactories.add((String) reference.getProperty("osgi.jndi.service.name"));
} else if (reference.getProperty("name") != null) {
connectionFactories.add((String) reference.getProperty("name"));
} else {
connectionFactories.add(reference.getProperty(Constants.SERVICE_ID).toString());
}
}
}
return connectionFactories;
}
@Override
public List<String> connectionFactoryFileNames() throws Exception {
String[] connectionFactoryFileNames = deployFolder.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("connectionfactory-") && name.endsWith(".xml");
}
});
return Arrays.asList(connectionFactoryFileNames);
}
@Override
public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
ConnectionMetaData metaData = connector.connect().getMetaData();
Map<String, String> map = new HashMap<String, String>();
map.put("product", metaData.getJMSProviderName());
map.put("version", metaData.getProviderVersion());
return map;
} finally {
connector.close();
}
}
@SuppressWarnings("unchecked")
@Override
public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
Session session = connector.createSession();
QueueBrowser browser = session.createBrowser(session.createQueue(destination));
Enumeration<Message> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
enumeration.nextElement();
count++;
}
browser.close();
return count;
} finally {
connector.close();
}
}
private DestinationSource getDestinationSource(Connection connection) throws JMSException {
if (connection instanceof PooledConnection) {
connection = ((PooledConnection) connection).getConnection();
}
if (connection instanceof ActiveMQConnection) {
return ((ActiveMQConnection) connection).getDestinationSource();
} else {
return null;
}
}
@Override
public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
List<String> queues = new ArrayList<String>();
DestinationSource destinationSource = getDestinationSource(connector.connect());
if (destinationSource != null) {
Set<ActiveMQQueue> activeMQQueues = destinationSource.getQueues();
for (ActiveMQQueue activeMQQueue : activeMQQueues) {
queues.add(activeMQQueue.getQueueName());
}
}
return queues;
} finally {
connector.close();
}
}
@Override
public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
DestinationSource destinationSource = getDestinationSource(connector.connect());
List<String> topics = new ArrayList<String>();
if (destinationSource != null) {
Set<ActiveMQTopic> activeMQTopics = destinationSource.getTopics();
for (ActiveMQTopic activeMQTopic : activeMQTopics) {
topics.add(activeMQTopic.getTopicName());
}
}
return topics;
} finally {
connector.close();
}
}
@SuppressWarnings("unchecked")
@Override
public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter,
String username, String password) throws JMSException, IOException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
List<JmsMessage> messages = new ArrayList<JmsMessage>();
Session session = connector.createSession();
QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter);
Enumeration<Message> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message message = enumeration.nextElement();
messages.add(new JmsMessage(message));
}
browser.close();
return messages;
} finally {
connector.close();
}
}
@Override
public void send(String connectionFactory, final String queue, final String body, final String replyTo,
String username, String password) throws IOException, JMSException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
Session session = connector.createSession();
Message message = session.createTextMessage(body);
if (replyTo != null) {
message.setJMSReplyTo(session.createQueue(replyTo));
}
MessageProducer producer = session.createProducer(session.createQueue(queue));
producer.send(message);
producer.close();
} finally {
connector.close();
}
}
@Override
public int consume(String connectionFactory, final String queue, final String selector, String username,
String password) throws Exception {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
int count = 0;
Session session = connector.createSession();
MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector);
Message message;
do {
message = consumer.receive(5000L);
if (message != null) {
count++;
}
} while (message != null);
return count;
} finally {
connector.close();
}
}
@Override
public int move(String connectionFactory, final String sourceQueue, final String targetQueue,
final String selector, String username, String password) throws IOException, JMSException {
JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password);
try {
int count = 0;
Session session = connector.createSession(Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector);
Message message;
do {
message = consumer.receive(5000L);
if (message != null) {
MessageProducer producer = session.createProducer(session.createQueue(targetQueue));
producer.send(message);
count++;
}
} while (message != null);
session.commit();
consumer.close();
return count;
} finally {
connector.close();
}
}
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
}