SMX4-1509 CXF NMR transport should support JAXWS Async API out of box

git-svn-id: https://svn.apache.org/repos/asf/servicemix/smx4/features/trunk@1509615 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java b/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
index 46b2aad..3a9dbc2 100644
--- a/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
+++ b/cxf/cxf-transport-nmr/src/main/java/org/apache/servicemix/cxf/transport/nmr/NMRConduitOutputStream.java
@@ -29,6 +29,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Logger;
 import javax.activation.DataHandler;
 import javax.jws.WebService;
@@ -36,6 +38,7 @@
 import javax.xml.namespace.QName;
 import javax.xml.transform.Source;
 import javax.xml.transform.stream.StreamSource;
+import org.apache.cxf.Bus;
 import org.apache.cxf.attachment.AttachmentImpl;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.io.CachedOutputStream;
@@ -43,7 +46,11 @@
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.apache.servicemix.nmr.api.Channel;
@@ -62,6 +69,7 @@
     private Channel channel;
     private NMRConduit conduit;
     private EndpointReferenceType target;
+    private boolean hasLoggedAsyncWarning;
 
     public NMRConduitOutputStream(Message m, NMR nmr, EndpointReferenceType target,
                                   NMRConduit conduit) {
@@ -84,7 +92,101 @@
         if (target != null) {
             target.getClass();
         }
-        channel.close();
+    }
+
+    private void syncInvoke(org.apache.servicemix.nmr.api.Exchange xchng) throws Exception {
+        try {
+            if (!isOneWay) {
+                channel.sendSync(xchng);
+                Source content = null;
+                org.apache.servicemix.nmr.api.Message nm = null;
+                if (xchng.getFault(false) != null) {
+                    content = xchng.getFault().getBody(Source.class);
+                    nm = xchng.getFault();
+                } else {
+                    content = xchng.getOut().getBody(Source.class);
+                    nm = xchng.getOut();
+                }
+                Message inMessage = new MessageImpl();
+                message.getExchange().setInMessage(inMessage);
+                InputStream ins = NMRMessageHelper.convertMessageToInputStream(content);
+                if (ins == null) {
+                    throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
+                }
+                inMessage.setContent(InputStream.class, ins);
+                //copy attachments
+                Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
+                for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
+                    cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
+                }
+                inMessage.setAttachments(cxfAttachmentList);
+
+                //copy properties
+                for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
+                    if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
+                        inMessage.put(ent.getKey(), ent.getValue());
+                    }
+                }
+
+                //copy securitySubject
+                inMessage.put(NMRTransportFactory.NMR_SECURITY_SUBJECT, nm.getSecuritySubject());
+
+                conduit.getMessageObserver().onMessage(inMessage);
+
+                xchng.setStatus(Status.Done);
+                channel.send(xchng);
+            } else {
+                channel.sendSync(xchng);
+            }
+        } finally {
+            channel.close();
+        }
+
+    }
+
+    private void asynInvokeWithWorkQueue(final org.apache.servicemix.nmr.api.Exchange exchange) throws Exception {
+        Runnable runnable = new Runnable() {
+            public void run() {
+                try {
+                    syncInvoke(exchange);
+                } catch (Throwable e) {
+                    ((PhaseInterceptorChain)message.getInterceptorChain()).abort();
+                    message.setContent(Exception.class, e);
+                    ((PhaseInterceptorChain)message.getInterceptorChain()).unwind(message);
+                    MessageObserver mo = message.getInterceptorChain().getFaultObserver();
+                    if (mo == null) {
+                        mo = message.getExchange().get(MessageObserver.class);
+                    }
+                    mo.onMessage(message);
+                }
+            }
+        };
+
+        try {
+            Executor ex = message.getExchange().get(Executor.class);
+            if (ex != null) {
+                message.getExchange().put(Executor.class.getName()
+                                + ".USING_SPECIFIED", Boolean.TRUE);
+                ex.execute(runnable);
+            } else {
+                WorkQueueManager mgr = message.getExchange().get(Bus.class)
+                        .getExtension(WorkQueueManager.class);
+                AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
+                if (qu == null) {
+                    qu = mgr.getAutomaticWorkQueue();
+                }
+                // need to set the time out somewhere
+                qu.execute(runnable);
+            }
+        } catch (RejectedExecutionException rex) {
+            if (!hasLoggedAsyncWarning) {
+                LOG.warning("Executor rejected background task to retrieve the response.  Suggest increasing the workqueue settings.");
+                hasLoggedAsyncWarning = true;
+            }
+            LOG.info("Executor rejected background task to retrieve the response, running on current thread.");
+            syncInvoke(exchange);
+        }
+
     }
 
     private void commitOutputMessage() throws IOException {
@@ -182,54 +284,19 @@
             xchng.setTarget(ref);
             xchng.setOperation(bop.getName());
             LOG.fine("sending message");
-            if (!isOneWay) {
-                channel.sendSync(xchng);
-                Source content = null;
-                org.apache.servicemix.nmr.api.Message nm = null;
-                if (xchng.getFault(false) != null) {
-                    content = xchng.getFault().getBody(Source.class);
-                    nm = xchng.getFault();
-                } else {
-                    content = xchng.getOut().getBody(Source.class);
-                    nm = xchng.getOut();
-                }
-                Message inMessage = new MessageImpl();
-                message.getExchange().setInMessage(inMessage);
-                InputStream ins = NMRMessageHelper.convertMessageToInputStream(content);
-                if (ins == null) {
-                    throw new IOException(new org.apache.cxf.common.i18n.Message("UNABLE.RETRIEVE.MESSAGE", LOG).toString());
-                }
-                inMessage.setContent(InputStream.class, ins);
-                //copy attachments
-                Collection<Attachment> cxfAttachmentList = new ArrayList<Attachment>();
-                for (Map.Entry<String, Object> ent : nm.getAttachments().entrySet()) {
-                    cxfAttachmentList.add(new AttachmentImpl(ent.getKey(), (DataHandler) ent.getValue()));
-                }
-                inMessage.setAttachments(cxfAttachmentList);
-                
-                //copy properties
-                for (Map.Entry<String, Object> ent : nm.getHeaders().entrySet()) {
-                    if (!ent.getKey().equals(Message.REQUESTOR_ROLE)) {
-                        inMessage.put(ent.getKey(), ent.getValue());
-                    }
-                }
-                
-                //copy securitySubject
-                inMessage.put(NMRTransportFactory.NMR_SECURITY_SUBJECT, nm.getSecuritySubject());
-                
-                conduit.getMessageObserver().onMessage(inMessage);
-
-                xchng.setStatus(Status.Done);
-                channel.send(xchng);
+            if (message.getExchange().isSynchronous()) {
+                syncInvoke(xchng);
             } else {
-                channel.sendSync(xchng);
+                asynInvokeWithWorkQueue(xchng);
             }
+
         } catch (IOException e) {
             throw e;
         } catch (Exception e) {
             e.printStackTrace();
             new IOException(e.toString());
         }
+
     }
 
     private Source getMessageContent(Message message2) throws IOException {