SMX4-1509 CXF NMR transport should support JAXWS Async API out of box
git-svn-id: https://svn.apache.org/repos/asf/servicemix/smx4/features/trunk@1509615 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java b/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
index 46b2aad..3a9dbc2 100644
--- a/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
+++ b/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
@@ -29,6 +29,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Logger;
import javax.activation.DataHandler;
import javax.jws.WebService;
@@ -36,6 +38,7 @@
import javax.xml.namespace.QName;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
+import org.apache.cxf.Bus;
import org.apache.cxf.attachment.AttachmentImpl;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.io.CachedOutputStream;
@@ -43,7 +46,11 @@
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.apache.servicemix.nmr.api.Channel;
@@ -62,6 +69,7 @@
private Channel channel;
private NMRConduit conduit;
private EndpointReferenceType target;
+ private boolean hasLoggedAsyncWarning;
public NMRConduitOutputStream(Message m, NMR nmr, EndpointReferenceType target,
NMRConduit conduit) {
@@ -84,7 +92,101 @@
if (target != null) {
target.getClass();
}
- channel.close();
+ }
+
+ private void syncInvoke(org.apache.servicemix.nmr.api.Exchange xchng) throws Exception {
+ try {
+ if (!isOneWay) {
+ channel.sendSync(xchng);
+ Source content = null;
+ org.apache.servicemix.nmr.api.Message nm = null;
+ if (xchng.getFault(false) != null) {
+ content = xchng.getFault().getBody(Source.class);
+ nm = xchng.getFault();
+ } else {
+ content = xchng.getOut().getBody(Source.class);
+ nm = xchng.getOut();
+ }
+ Message inMessage = new MessageImpl();
+ message.getExchange().setInMessage(inMessage);
+ InputStream ins = NMRMessageHelper.convertMessageToInputStream(content);
+ if (ins == null) {
+ throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
+ }
+ inMessage.setContent(InputStream.class, ins);
+ //copy attachments
+ Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+ for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+ cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+ }
+ inMessage.setAttachments(cxfAttachmentList);
+
+ //copy properties
+ for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+ if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+ inMessage.put(ent.getKey(), ent.getValue());
+ }
+ }
+
+ //copy securitySubject
+ inMessage.put(NMRTransportFactory.NMR_SECURITY_SUBJECT, nm.getSecuritySubject());
+
+ conduit.getMessageObserver().onMessage(inMessage);
+
+ xchng.setStatus(Status.Done);
+ channel.send(xchng);
+ } else {
+ channel.sendSync(xchng);
+ }
+ } finally {
+ channel.close();
+ }
+
+ }
+
+ private void asynInvokeWithWorkQueue(final org.apache.servicemix.nmr.api.Exchange exchange) throws Exception {
+ Runnable runnable = new Runnable() {
+ public void run() {
+ try {
+ syncInvoke(exchange);
+ } catch (Throwable e) {
+ ((PhaseInterceptorChain)message.getInterceptorChain()).abort();
+ message.setContent(Exception.class, e);
+ ((PhaseInterceptorChain)message.getInterceptorChain()).unwind(message);
+ MessageObserver mo = message.getInterceptorChain().getFaultObserver();
+ if (mo == null) {
+ mo = message.getExchange().get(MessageObserver.class);
+ }
+ mo.onMessage(message);
+ }
+ }
+ };
+
+ try {
+ Executor ex = message.getExchange().get(Executor.class);
+ if (ex != null) {
+ message.getExchange().put(Executor.class.getName()
+ + ".USING_SPECIFIED", Boolean.TRUE);
+ ex.execute(runnable);
+ } else {
+ WorkQueueManager mgr = message.getExchange().get(Bus.class)
+ .getExtension(WorkQueueManager.class);
+ AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
+ if (qu == null) {
+ qu = mgr.getAutomaticWorkQueue();
+ }
+ // need to set the time out somewhere
+ qu.execute(runnable);
+ }
+ } catch (RejectedExecutionException rex) {
+ if (!hasLoggedAsyncWarning) {
+ LOG.warning("Executor rejected background task to retrieve the response. Suggest increasing the workqueue settings.");
+ hasLoggedAsyncWarning = true;
+ }
+ LOG.info("Executor rejected background task to retrieve the response, running on current thread.");
+ syncInvoke(exchange);
+ }
+
}
private void commitOutputMessage() throws IOException {
@@ -182,54 +284,19 @@
xchng.setTarget(ref);
xchng.setOperation(bop.getName());
LOG.fine("sending message");
- if (!isOneWay) {
- channel.sendSync(xchng);
- Source content = null;
- org.apache.servicemix.nmr.api.Message nm = null;
- if (xchng.getFault(false) != null) {
- content = xchng.getFault().getBody(Source.class);
- nm = xchng.getFault();
- } else {
- content = xchng.getOut().getBody(Source.class);
- nm = xchng.getOut();
- }
- Message inMessage = new MessageImpl();
- message.getExchange().setInMessage(inMessage);
- InputStream ins = NMRMessageHelper.convertMessageToInputStream(content);
- if (ins == null) {
- throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
- }
- inMessage.setContent(InputStream.class, ins);
- //copy attachments
- Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
- for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
- cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
- }
- inMessage.setAttachments(cxfAttachmentList);
-
- //copy properties
- for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
- if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
- inMessage.put(ent.getKey(), ent.getValue());
- }
- }
-
- //copy securitySubject
- inMessage.put(NMRTransportFactory.NMR_SECURITY_SUBJECT, nm.getSecuritySubject());
-
- conduit.getMessageObserver().onMessage(inMessage);
-
- xchng.setStatus(Status.Done);
- channel.send(xchng);
+ if (message.getExchange().isSynchronous()) {
+ syncInvoke(xchng);
} else {
- channel.sendSync(xchng);
+ asynInvokeWithWorkQueue(xchng);
}
+
} catch (IOException e) {
throw e;
} catch (Exception e) {
e.printStackTrace();
new IOException(e.toString());
}
+
}
private Source getMessageContent(Message message2) throws IOException {