blob: 1ccdb0a2dcf7c1d3b9bccc0092ac988a2a447cd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.camel;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.xml.namespace.QName;
import org.apache.camel.*;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Registry;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
/**
* Represents an {@link org.apache.camel.Endpoint} for interacting with JBI
*
* @version $Revision: 563665 $
*/
public class JbiEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
private String destinationUri;
private String mep;
private QName operation;
private boolean convertExceptions;
private String serialization;
private HeaderFilterStrategy headerFilterStrategy;
private final JbiComponent jbiComponent;
private final JbiBinding binding;
public JbiEndpoint(JbiComponent jbiComponent, String uri) {
super(uri, jbiComponent);
this.jbiComponent = jbiComponent;
parseUri(uri);
//now create the binding based on the information read from the URI
this.binding = createBinding();
}
public JbiBinding createBinding() {
JbiBinding result = new JbiBinding(this.getCamelContext(), serialization);
result.setConvertExceptions(convertExceptions);
result.addHeaderFilterStrategy(headerFilterStrategy);
return result;
}
public Producer createProducer() throws Exception {
return new JbiProducer(this);
}
protected class JbiProducer extends DefaultProducer implements AsyncProcessor {
private final Logger logger = LoggerFactory.getLogger(JbiProducer.class);
private CamelConsumerEndpoint consumer;
public JbiProducer(Endpoint exchangeEndpoint) {
super(exchangeEndpoint);
}
@Override
public void start() throws Exception {
consumer = new CamelConsumerEndpoint(binding, JbiEndpoint.this);
jbiComponent.getCamelJbiComponent().addEndpoint(consumer);
super.start();
}
@Override
public void stop() throws Exception {
if (isStopped()) {
logger.debug("Camel producer for " + super.getEndpoint() + " has already been stopped");
} else {
logger.debug("Stopping Camel producer for " + super.getEndpoint());
jbiComponent.getCamelJbiComponent().removeEndpoint(consumer);
super.stop();
}
}
public void process(final Exchange exchange) throws Exception {
binding.runWithCamelContextClassLoader(new Callable<Object>() {
public Object call() throws Exception {
consumer.process(exchange);
return null;
}
});
}
/*
* Access the underlying JBI Consumer endpoint
*/
protected CamelConsumerEndpoint getCamelConsumerEndpoint() {
return consumer;
}
public boolean process(final Exchange exchange, final AsyncCallback asyncCallback) {
try {
return binding.runWithCamelContextClassLoader(new Callable<Boolean>() {
public Boolean call() throws Exception {
return consumer.process(exchange, asyncCallback);
}
});
} catch (Exception e) {
exchange.setException(e);
return true;
}
}
}
@SuppressWarnings("unchecked")
private void parseUri(String uri) {
destinationUri = uri;
try {
int idx = destinationUri.indexOf('?');
if (idx > 0) {
Map params = URISupport.parseQuery(destinationUri.substring(idx + 1));
mep = (String) params.get("mep");
if (mep != null && !mep.startsWith("http://www.w3.org/ns/wsdl/")) {
mep = "http://www.w3.org/ns/wsdl/" + mep;
params.remove("mep");
}
String oper = (String) params.get("operation");
if (StringUtils.hasLength(oper)) {
operation = QName.valueOf(oper);
params.remove("operation");
}
this.destinationUri = destinationUri.substring(0, idx);
String filter = (String) params.get("headerFilterStrategy");
if (StringUtils.hasLength(filter)) {
Registry registry = jbiComponent.getCamelContext().getRegistry();
if (filter.indexOf('#') != -1) {
filter = filter.substring(1);
}
Object object = registry.lookup(filter);
if (object instanceof HeaderFilterStrategy) {
headerFilterStrategy = (HeaderFilterStrategy)object;
}
params.remove("headerFilterStrategy");
}
String convert = (String) params.get("convertExceptions");
if (StringUtils.hasLength(convert)) {
this.setConvertExceptions(Boolean.valueOf(convert));
params.remove("convertExceptions");
}
String serialization = (String) params.get("serialization");
if (StringUtils.hasLength(serialization)) {
this.setSerialization(serialization);
params.remove("serialization");
}
String endpointUri = this.destinationUri + URISupport.createQueryString(params);
this.setEndpointUri(endpointUri);
}
} catch (URISyntaxException e) {
throw new JbiException(e);
}
}
public void setMep(String str) {
mep = str;
}
public void setOperation(QName operation) {
this.operation = operation;
}
public void setDestionationUri(String str) {
destinationUri = str;
}
public String getMep() {
return mep;
}
public QName getOperation() {
return operation;
}
public String getDestinationUri() {
return destinationUri;
}
public Consumer createConsumer(final Processor processor) throws Exception {
return new DefaultConsumer(this, processor) {
private CamelProviderEndpoint jbiEndpoint;
@Override
protected void doStart() throws Exception {
super.doStart();
jbiEndpoint = jbiComponent.createJbiEndpointFromCamel(JbiEndpoint.this, AsyncProcessorTypeConverter.convert(processor));
jbiComponent.getCamelJbiComponent().activateJbiEndpoint(jbiEndpoint);
}
@Override
protected void doStop() throws Exception {
if (jbiEndpoint != null) {
jbiComponent.getCamelJbiComponent().deactivateJbiEndpoint(jbiEndpoint);
}
super.doStop();
}
};
}
public boolean isSingleton() {
return true;
}
public HeaderFilterStrategy getHeaderFilterStrategy() {
return headerFilterStrategy;
}
public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
this.headerFilterStrategy = strategy;
}
public void setConvertExceptions(boolean convertExceptions) {
this.convertExceptions = convertExceptions;
}
public boolean isConvertExceptions() {
return convertExceptions;
}
public void setSerialization(String serialization) {
this.serialization = serialization;
}
public String getSerialization() {
return serialization;
}
}