blob: dbe211e5641cdcff8f5f734d4fb198ad586daf29 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
/**
* A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from
* a Camel {@link Endpoint}
*
*
*/
public class CamelMessageConsumer implements MessageConsumer {
private final CamelDestination destination;
private final Endpoint endpoint;
private final ActiveMQSession session;
private final String messageSelector;
private final boolean noLocal;
private MessageListener messageListener;
private Consumer consumer;
private PollingConsumer pollingConsumer;
private boolean closed;
public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
this.destination = destination;
this.endpoint = endpoint;
this.session = session;
this.messageSelector = messageSelector;
this.noLocal = noLocal;
}
public void close() throws JMSException {
if (!closed) {
closed = true;
try {
if (consumer != null) {
consumer.stop();
}
if (pollingConsumer != null) {
pollingConsumer.stop();
}
} catch (JMSException e) {
throw e;
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
public MessageListener getMessageListener() throws JMSException {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) throws JMSException {
this.messageListener = messageListener;
if (messageListener != null && consumer == null) {
consumer = createConsumer();
}
}
public Message receive() throws JMSException {
Exchange exchange = getPollingConsumer().receive();
return createMessage(exchange);
}
public Message receive(long timeoutMillis) throws JMSException {
Exchange exchange = getPollingConsumer().receive(timeoutMillis);
return createMessage(exchange);
}
public Message receiveNoWait() throws JMSException {
Exchange exchange = getPollingConsumer().receiveNoWait();
return createMessage(exchange);
}
// Properties
// -----------------------------------------------------------------------
public CamelDestination getDestination() {
return destination;
}
public Endpoint getEndpoint() {
return endpoint;
}
public String getMessageSelector() {
return messageSelector;
}
public boolean isNoLocal() {
return noLocal;
}
public ActiveMQSession getSession() {
return session;
}
// Implementation methods
// -----------------------------------------------------------------------
protected PollingConsumer getPollingConsumer() throws JMSException {
try {
if (pollingConsumer == null) {
pollingConsumer = endpoint.createPollingConsumer();
pollingConsumer.start();
}
return pollingConsumer;
} catch (JMSException e) {
throw e;
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
protected Message createMessage(Exchange exchange) throws JMSException {
if (exchange != null) {
Message message = destination.getBinding().makeJmsMessage(exchange, session);
return message;
} else {
return null;
}
}
protected Consumer createConsumer() throws JMSException {
try {
Consumer answer = endpoint.createConsumer(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = createMessage(exchange);
getMessageListener().onMessage(message);
}
});
answer.start();
return answer;
} catch (JMSException e) {
throw e;
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
protected void checkClosed() throws javax.jms.IllegalStateException {
if (closed) {
throw new IllegalStateException("The producer is closed");
}
}
}