blob: 4abd59c8becdca284668d0770fea83fa808f24bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.cxf.transport;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateConsumerException;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
import org.apache.camel.component.cxf.common.message.DefaultCxfMesssageMapper;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.configuration.Configurable;
import org.apache.cxf.configuration.Configurer;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.transport.AbstractDestination;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.ConduitInitiator;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version
*
* Forwards messages from Camel to CXF and the CXF response back to Camel
*/
public class CamelDestination extends AbstractDestination implements Configurable {
protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination";
private static final Logger LOG = LoggerFactory.getLogger(CamelDestination.class);
// used for places where CXF requires JUL
private static final java.util.logging.Logger JUL_LOG = LogUtils.getL7dLogger(CamelDestination.class);
final ConduitInitiator conduitInitiator;
CamelContext camelContext;
Consumer consumer;
String camelDestinationUri;
private Endpoint destinationEndpoint;
private HeaderFilterStrategy headerFilterStrategy;
private boolean checkException;
public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException {
this(camelContext, bus, ci, info, null, false);
}
public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info,
HeaderFilterStrategy headerFilterStrategy, boolean checkException) throws IOException {
super(bus, getTargetReference(info, bus), info);
this.camelContext = camelContext;
conduitInitiator = ci;
camelDestinationUri = endpointInfo.getAddress().substring(CamelTransportConstants.CAMEL_TRANSPORT_PREFIX.length());
if (camelDestinationUri.startsWith("//")) {
camelDestinationUri = camelDestinationUri.substring(2);
}
initConfig();
this.headerFilterStrategy = headerFilterStrategy;
this.checkException = checkException;
}
protected java.util.logging.Logger getLogger() {
return JUL_LOG;
}
public void setCheckException(boolean exception) {
checkException = exception;
}
public boolean isCheckException() {
return checkException;
}
/**
* @param inMessage the incoming message
* @return the inbuilt backchannel
*/
protected Conduit getInbuiltBackChannel(Message inMessage) {
//we can pass the message back by looking up the camelExchange from inMessage
return new BackChannelConduit(inMessage);
}
public void activate() {
LOG.debug("CamelDestination activate().... ");
ObjectHelper.notNull(camelContext, "CamelContext", this);
try {
LOG.debug("establishing Camel connection");
destinationEndpoint = getCamelContext().getEndpoint(camelDestinationUri);
if (destinationEndpoint == null) {
throw new NoSuchEndpointException(camelDestinationUri);
}
consumer = destinationEndpoint.createConsumer(new ConsumerProcessor());
ServiceHelper.startService(consumer);
} catch (NoSuchEndpointException nex) {
throw nex;
} catch (Exception ex) {
if (destinationEndpoint == null) {
throw new FailedToCreateConsumerException(camelDestinationUri, ex);
}
throw new FailedToCreateConsumerException(destinationEndpoint, ex);
}
}
public void deactivate() {
try {
ServiceHelper.stopService(consumer);
} catch (Exception e) {
LOG.warn("Error stopping consumer", e);
}
}
public void shutdown() {
LOG.debug("CamelDestination shutdown()");
this.deactivate();
}
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
protected void incoming(org.apache.camel.Exchange camelExchange) {
LOG.debug("server received request: ", camelExchange);
DefaultCxfMesssageMapper beanBinding = new DefaultCxfMesssageMapper();
org.apache.cxf.message.Message inMessage =
beanBinding.createCxfMessageFromCamelExchange(camelExchange, headerFilterStrategy);
inMessage.put(CamelTransportConstants.CAMEL_EXCHANGE, camelExchange);
((MessageImpl) inMessage).setDestination(this);
// Handling the incoming message
// The response message will be send back by the outgoing chain
incomingObserver.onMessage(inMessage);
}
public String getBeanName() {
if (endpointInfo == null || endpointInfo.getName() == null) {
return "default" + BASE_BEAN_NAME_SUFFIX;
}
return endpointInfo.getName().toString() + BASE_BEAN_NAME_SUFFIX;
}
public String getCamelDestinationUri() {
return camelDestinationUri;
}
private void initConfig() {
//we could configure the camel context here
if (bus != null) {
Configurer configurer = bus.getExtension(Configurer.class);
if (null != configurer) {
configurer.configureBean(this);
}
}
}
protected class ConsumerProcessor implements Processor {
public void process(Exchange exchange) {
try {
incoming(exchange);
} catch (Throwable ex) {
exchange.setException(ex);
}
}
}
// this should deal with the cxf message
protected class BackChannelConduit extends AbstractConduit {
protected Message inMessage;
Exchange camelExchange;
org.apache.cxf.message.Exchange cxfExchange;
BackChannelConduit(Message message) {
super(EndpointReferenceUtils.getAnonymousEndpointReference());
inMessage = message;
cxfExchange = inMessage.getExchange();
camelExchange = cxfExchange.get(Exchange.class);
}
/**
* Register a message observer for incoming messages.
*
* @param observer the observer to notify on receipt of incoming
*/
public void setMessageObserver(MessageObserver observer) {
// shouldn't be called for a back channel conduit
}
/**
* Send an outbound message, assumed to contain all the name-value
* mappings of the corresponding input message (if any).
*
* @param message the message to be sent.
*/
public void prepare(Message message) throws IOException {
message.put(CamelTransportConstants.CAMEL_EXCHANGE, inMessage.get(CamelTransportConstants.CAMEL_EXCHANGE));
message.setContent(OutputStream.class, new CamelOutputStream(message));
}
protected java.util.logging.Logger getLogger() {
return JUL_LOG;
}
}
/**
* Mark message as a partial message.
*
* @param partialResponse the partial response message
* @param decoupledTarget the decoupled target
* @return <tt>true</tt> if partial responses is supported
*/
protected boolean markPartialResponse(Message partialResponse,
EndpointReferenceType decoupledTarget) {
return true;
}
/**
* @return the associated conduit initiator
*/
protected ConduitInitiator getConduitInitiator() {
return conduitInitiator;
}
protected void propagateResponseHeadersToCamel(Message outMessage, Exchange camelExchange) {
CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy, outMessage,
camelExchange.getOut().getHeaders(), camelExchange);
}
/**
* Receives a response from CXF and forwards it to the camel route the request came in from
*/
private class CamelOutputStream extends CachedOutputStream {
private Message outMessage;
public CamelOutputStream(Message m) {
super();
outMessage = m;
}
// Prepare the message and get the send out message
private void commitOutputMessage() throws IOException {
Exchange camelExchange = (Exchange)outMessage.get(CamelTransportConstants.CAMEL_EXCHANGE);
propagateResponseHeadersToCamel(outMessage, camelExchange);
// check if the outMessage has the exception
Exception exception = outMessage.getContent(Exception.class);
if (checkException && exception != null) {
camelExchange.setException(exception);
}
CachedOutputStream outputStream = (CachedOutputStream)outMessage.getContent(OutputStream.class);
camelExchange.getOut().setBody(outputStream.getInputStream());
LOG.debug("send the response message: {}", outputStream);
}
@Override
protected void doFlush() throws IOException {
// Do nothing here
}
@Override
protected void doClose() throws IOException {
commitOutputMessage();
}
@Override
protected void onWrite() throws IOException {
// Do nothing here
}
}
}