blob: 547902289154563332681e08043e38d3b90cbb9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.camel.Consumer;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
@ManagedResource(description = "Managed Camel Broker Endpoint")
@UriEndpoint(scheme = "broker", syntax = "broker:destination", consumerClass = BrokerConsumer.class, title = "Broker", label = "messaging")
public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service {
static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
private MessageInterceptorRegistry messageInterceptorRegistry;
private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
@UriPath(name = "destination") @Metadata(required = "true")
private String destinationName;
private final ActiveMQDestination destination;
@UriParam
private final BrokerConfiguration configuration;
public BrokerEndpoint(String uri, BrokerComponent component, String destinationName, ActiveMQDestination destination, BrokerConfiguration configuration) {
super(UnsafeUriCharactersEncoder.encode(uri), component);
this.destinationName = destinationName;
this.destination = destination;
this.configuration = configuration;
}
@Override
public Producer createProducer() throws Exception {
BrokerProducer producer = new BrokerProducer(this);
return producer;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
BrokerConsumer consumer = new BrokerConsumer(this, processor);
configureConsumer(consumer);
return consumer;
}
@Override
public boolean isSingleton() {
return false;
}
@Override
public boolean isMultipleConsumersSupported() {
return true;
}
public ActiveMQDestination getDestination() {
return destination;
}
/**
* The name of the JMS destination
*/
public String getDestinationName() {
return destinationName;
}
@Override
protected void doStart() throws Exception {
super.doStart();
messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName());
for (MessageInterceptor messageInterceptor : messageInterceptorList) {
addMessageInterceptor(messageInterceptor);
}
messageInterceptorList.clear();
}
@Override
protected void doStop() throws Exception {
super.doStop();
}
protected void addMessageInterceptor(MessageInterceptor messageInterceptor) {
if (isStarted()) {
messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor);
} else {
messageInterceptorList.add(messageInterceptor);
}
}
protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
}
protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
ProducerBrokerExchange pbe = producerBrokerExchange;
if (message != null) {
message.setDestination(destination);
if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){
if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){
//The message broker will create a new ProducerBrokerExchange with the
//correct region broker set
pbe = null;
}
}
messageInterceptorRegistry.injectMessage(pbe, message);
}
}
}