blob: 53d3a432b8374de703b1145664e1e72e782a3ae3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.endpoint;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.common.classloader.ClassLoaderUtils;
import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder;
import org.apache.cxf.common.i18n.UncheckedException;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.PropertyUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
import org.apache.cxf.interceptor.ClientOutFaultObserver;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.interceptor.InterceptorProvider;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageContentsList;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.phase.PhaseChainCache;
import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.phase.PhaseManager;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.BindingMessageInfo;
import org.apache.cxf.service.model.BindingOperationInfo;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.service.model.MessageInfo;
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.workqueue.SynchronousExecutor;
public class ClientImpl
extends AbstractBasicInterceptorProvider
implements Client, Retryable, MessageObserver {
public static final String THREAD_LOCAL_REQUEST_CONTEXT = "thread.local.request.context";
/**
* When a synchronous request/response invoke is done using an asynchronous transport mechanism,
* this is the timeout used for waiting for the response. Default is 60 seconds.
*/
public static final String SYNC_TIMEOUT = "cxf.synchronous.timeout";
public static final String FINISHED = "exchange.finished";
private static final Logger LOG = LogUtils.getL7dLogger(ClientImpl.class);
protected Bus bus;
protected ConduitSelector conduitSelector;
protected ClientOutFaultObserver outFaultObserver;
protected int synchronousTimeout = 60000; // default 60 second timeout
protected PhaseChainCache outboundChainCache = new PhaseChainCache();
protected PhaseChainCache inboundChainCache = new PhaseChainCache();
protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<>(8, 0.75f, 4);
protected Thread latestContextThread;
protected Map<Thread, EchoContext> requestContext
= Collections.synchronizedMap(new WeakHashMap<Thread, EchoContext>());
protected Map<Thread, ResponseContext> responseContext
= Collections.synchronizedMap(new WeakHashMap<Thread, ResponseContext>());
protected Executor executor;
public ClientImpl(Bus b, Endpoint e) {
this(b, e, (ConduitSelector)null);
}
public ClientImpl(Bus b, Endpoint e, Conduit c) {
this(b, e, new PreexistingConduitSelector(c));
}
public ClientImpl(Bus b, Endpoint e, ConduitSelector sc) {
bus = b;
outFaultObserver = new ClientOutFaultObserver(bus);
getConduitSelector(sc).setEndpoint(e);
notifyLifecycleManager();
}
/**
* Create a Client that uses a specific EndpointImpl.
* @param bus
* @param svc
* @param port
* @param endpointImplFactory
*/
public ClientImpl(Bus bus, Service svc, QName port,
EndpointImplFactory endpointImplFactory) {
this.bus = bus;
outFaultObserver = new ClientOutFaultObserver(bus);
EndpointInfo epfo = findEndpoint(svc, port);
try {
if (endpointImplFactory != null) {
getConduitSelector().setEndpoint(endpointImplFactory.newEndpointImpl(bus, svc, epfo));
} else {
getConduitSelector().setEndpoint(new EndpointImpl(bus, svc, epfo));
}
} catch (EndpointException epex) {
throw new IllegalStateException("Unable to create endpoint: " + epex.getMessage(), epex);
}
notifyLifecycleManager();
}
public Bus getBus() {
return bus;
}
public void destroy() {
if (bus == null) {
return;
}
if (getEndpoint() != null) {
for (Closeable c : getEndpoint().getCleanupHooks()) {
try {
c.close();
} catch (IOException e) {
//ignore
}
}
}
ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
if (null != mgr) {
mgr.clientDestroyed(this);
}
if (conduitSelector != null) {
if (conduitSelector instanceof Closeable) {
try {
((Closeable)conduitSelector).close();
} catch (IOException e) {
//ignore, we're destroying anyway
}
} else {
getConduit().close();
}
}
bus = null;
conduitSelector = null;
outFaultObserver = null;
outboundChainCache = null;
inboundChainCache = null;
currentRequestContext = null;
requestContext.clear();
requestContext = null;
responseContext.clear();
responseContext = null;
executor = null;
}
private void notifyLifecycleManager() {
ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
if (null != mgr) {
mgr.clientCreated(this);
}
}
private EndpointInfo findEndpoint(Service svc, QName port) {
if (port != null) {
EndpointInfo epfo = svc.getEndpointInfo(port);
if (epfo == null) {
throw new IllegalArgumentException("The service " + svc.getName()
+ " does not have an endpoint " + port + ".");
}
return epfo;
}
for (ServiceInfo svcfo : svc.getServiceInfos()) {
for (EndpointInfo e : svcfo.getEndpoints()) {
BindingInfo bfo = e.getBinding();
String bid = bfo.getBindingId();
if ("http://schemas.xmlsoap.org/wsdl/soap/".equals(bid)
|| "http://schemas.xmlsoap.org/wsdl/soap12/".equals(bid)) {
for (Object o : bfo.getExtensors().get()) {
try {
String s = (String)o.getClass().getMethod("getTransportURI").invoke(o);
if (s != null && s.endsWith("http")) {
return e;
}
} catch (Throwable t) {
//ignore
}
}
}
}
}
throw new UnsupportedOperationException(
"Only document-style SOAP 1.1 and 1.2 http are supported "
+ "for auto-selection of endpoint; none were found.");
}
public Endpoint getEndpoint() {
return getConduitSelector().getEndpoint();
}
public void releaseThreadContexts() {
final Thread t = Thread.currentThread();
requestContext.remove(t);
responseContext.remove(t);
}
@Override
public Contexts getContexts() {
return new Contexts() {
@Override
public void close() throws Exception {
releaseThreadContexts();
}
@Override
public Map<String, Object> getRequestContext() {
return ClientImpl.this.getRequestContext();
}
@Override
public Map<String, Object> getResponseContext() {
return ClientImpl.this.getResponseContext();
}
};
}
public Map<String, Object> getRequestContext() {
if (isThreadLocalRequestContext()) {
final Thread t = Thread.currentThread();
requestContext.computeIfAbsent(t, k -> new EchoContext(currentRequestContext));
latestContextThread = t;
return requestContext.get(t);
}
return currentRequestContext;
}
public Map<String, Object> getResponseContext() {
if (!responseContext.containsKey(Thread.currentThread())) {
final Thread t = Thread.currentThread();
responseContext.put(t, new ResponseContext(responseContext));
}
return responseContext.get(Thread.currentThread());
}
protected Map<String, Object> setResponseContext(Map<String, Object> ctx) {
if (ctx instanceof ResponseContext) {
ResponseContext c = (ResponseContext)ctx;
responseContext.put(Thread.currentThread(), c);
return c;
}
ResponseContext c = new ResponseContext(ctx, responseContext);
responseContext.put(Thread.currentThread(), c);
return c;
}
public boolean isThreadLocalRequestContext() {
Object o = currentRequestContext.get(THREAD_LOCAL_REQUEST_CONTEXT);
if (o != null) {
final boolean local;
if (o instanceof Boolean) {
local = ((Boolean)o).booleanValue();
} else {
local = Boolean.parseBoolean(o.toString());
}
return local;
}
return false;
}
public void setThreadLocalRequestContext(boolean b) {
currentRequestContext.put(THREAD_LOCAL_REQUEST_CONTEXT, b);
}
public Object[] invoke(BindingOperationInfo oi, Object... params) throws Exception {
return invoke(oi, params, null);
}
public Object[] invoke(String operationName, Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
return invoke(q, params);
}
public Object[] invoke(QName operationName, Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
if (op.isUnwrappedCapable()) {
op = op.getUnwrappedOperation();
}
return invoke(op, params);
}
public Object[] invokeWrapped(String operationName, Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
return invokeWrapped(q, params);
}
public Object[] invokeWrapped(QName operationName, Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
return invoke(op, params);
}
public Object[] invoke(BindingOperationInfo oi,
Object[] params,
Exchange exchange) throws Exception {
Map<String, Object> context = new HashMap<>();
return invoke(oi, params, context, exchange);
}
public Object[] invoke(BindingOperationInfo oi,
Object[] params,
Map<String, Object> context) throws Exception {
return invoke(oi, params, context, (Exchange)null);
}
public void invoke(ClientCallback callback,
String operationName,
Object... params) throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
invoke(callback, q, params);
}
public void invoke(ClientCallback callback,
QName operationName,
Object... params) throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
if (op.isUnwrappedCapable()) {
op = op.getUnwrappedOperation();
}
invoke(callback, op, params);
}
public void invokeWrapped(ClientCallback callback,
String operationName,
Object... params)
throws Exception {
QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
invokeWrapped(callback, q, params);
}
public void invokeWrapped(ClientCallback callback,
QName operationName,
Object... params)
throws Exception {
BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
if (op == null) {
throw new UncheckedException(
new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
}
invoke(callback, op, params);
}
public void invoke(ClientCallback callback,
BindingOperationInfo oi,
Object... params) throws Exception {
invoke(callback, oi, params, null, null);
}
public void invoke(ClientCallback callback,
BindingOperationInfo oi,
Object[] params,
Map<String, Object> context) throws Exception {
invoke(callback, oi, params, context, null);
}
public void invoke(ClientCallback callback,
BindingOperationInfo oi,
Object[] params,
Exchange exchange) throws Exception {
invoke(callback, oi, params, null, exchange);
}
public void invoke(ClientCallback callback,
BindingOperationInfo oi,
Object[] params,
Map<String, Object> context,
Exchange exchange) throws Exception {
doInvoke(callback, oi, params, context, exchange);
}
public Object[] invoke(BindingOperationInfo oi,
Object[] params,
Map<String, Object> context,
Exchange exchange) throws Exception {
return doInvoke(null, oi, params, context, exchange);
}
private Object[] doInvoke(final ClientCallback callback,
BindingOperationInfo oi,
Object[] params,
Map<String, Object> context,
Exchange exchange) throws Exception {
Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
ClassLoaderHolder origLoader = null;
Map<String, Object> resContext = null;
try {
ClassLoader loader = bus.getExtension(ClassLoader.class);
if (loader != null) {
origLoader = ClassLoaderUtils.setThreadContextClassloader(loader);
}
if (exchange == null) {
exchange = new ExchangeImpl();
}
exchange.setSynchronous(callback == null);
Endpoint endpoint = getEndpoint();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Invoke, operation info: " + oi + ", params: " + Arrays.toString(params));
}
Message message = endpoint.getBinding().createMessage();
// Make sure INVOCATION CONTEXT, REQUEST_CONTEXT and RESPONSE_CONTEXT are present
// on message
if (context == null) {
context = new HashMap<>();
}
Map<String, Object> reqContext = CastUtils.cast((Map<?, ?>)context.get(REQUEST_CONTEXT));
resContext = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT));
if (reqContext == null) {
reqContext = new HashMap<>(getRequestContext());
context.put(REQUEST_CONTEXT, reqContext);
}
if (resContext == null) {
resContext = new ResponseContext(responseContext);
context.put(RESPONSE_CONTEXT, resContext);
}
message.put(Message.INVOCATION_CONTEXT, context);
setContext(reqContext, message);
exchange.putAll(reqContext);
setParameters(params, message);
if (null != oi) {
exchange.setOneWay(oi.getOutput() == null);
}
exchange.setOutMessage(message);
exchange.put(ClientCallback.class, callback);
setOutMessageProperties(message, oi);
setExchangeProperties(exchange, endpoint, oi);
PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
message.setInterceptorChain(chain);
if (callback == null) {
chain.setFaultObserver(outFaultObserver);
} else {
// We need to wrap the outFaultObserver if the callback is not null
// calling the conduitSelector.complete to make sure the fail over feature works
chain.setFaultObserver(new MessageObserver() {
public void onMessage(Message message) {
Exception ex = message.getContent(Exception.class);
if (ex != null) {
completeExchange(message.getExchange());
if (message.getContent(Exception.class) == null) {
// handle the right response
Message inMsg = message.getExchange().getInMessage();
Map<String, Object> ctx = responseContext.get(Thread.currentThread());
List<Object> resList = CastUtils.cast(inMsg.getContent(List.class));
Object[] result = resList == null ? null : resList.toArray();
callback.handleResponse(ctx, result);
return;
}
}
outFaultObserver.onMessage(message);
}
});
}
prepareConduitSelector(message);
// add additional interceptors and such
modifyChain(chain, message, false);
try {
chain.doIntercept(message);
} catch (Fault fault) {
enrichFault(fault);
throw fault;
}
if (callback != null) {
return null;
}
return processResult(message, exchange, oi, resContext);
} finally {
//ensure ResponseContext has HTTP RESPONSE CODE
if (null != exchange) {
Integer responseCode = (Integer)exchange.get(Message.RESPONSE_CODE);
resContext.put(MessageContext.HTTP_RESPONSE_CODE, responseCode);
resContext.put(org.apache.cxf.message.Message.RESPONSE_CODE, responseCode);
setResponseContext(resContext);
}
if (origLoader != null) {
origLoader.reset();
}
if (origBus != bus) {
BusFactory.setThreadDefaultBus(origBus);
}
}
}
private void completeExchange(Exchange exchange) {
getConduitSelector().complete(exchange);
}
/**
* TODO This is SOAP specific code and should not be in cxf core
* @param fault
*/
private void enrichFault(Fault fault) {
if (fault.getCause().getCause() instanceof IOException
|| fault.getCause() instanceof IOException) {
String soap11NS = "http://schemas.xmlsoap.org/soap/envelope/";
String soap12NS = "http://www.w3.org/2003/05/soap-envelope";
QName faultCode = fault.getFaultCode();
//for SoapFault, if it's underlying cause is IOException,
//it means something like server is down or can't create
//connection, according to soap spec we should set fault as
//Server Fault
if (faultCode.getNamespaceURI().equals(
soap11NS)
&& "Client".equals(faultCode.getLocalPart())) {
faultCode = new QName(soap11NS, "Server");
fault.setFaultCode(faultCode);
}
if (faultCode.getNamespaceURI().equals(
soap12NS)
&& "Sender".equals(faultCode.getLocalPart())) {
faultCode = new QName(soap12NS, "Receiver");
fault.setFaultCode(faultCode);
}
}
}
protected Object[] processResult(Message message,
Exchange exchange,
BindingOperationInfo oi,
Map<String, Object> resContext) throws Exception {
Exception ex = null;
// Check to see if there is a Fault from the outgoing chain if it's an out Message
if (!message.get(Message.INBOUND_MESSAGE).equals(Boolean.TRUE)) {
ex = message.getContent(Exception.class);
}
boolean mepCompleteCalled = false;
if (ex != null) {
completeExchange(exchange);
mepCompleteCalled = true;
if (message.getContent(Exception.class) != null) {
throw ex;
}
}
ex = message.getExchange().get(Exception.class);
if (ex != null) {
if (!mepCompleteCalled) {
completeExchange(exchange);
}
throw ex;
}
//REVISIT
// - use a protocol neutral no-content marker instead of 202?
// - move the decoupled destination property name into api
Integer responseCode = (Integer)exchange.get(Message.RESPONSE_CODE);
if (null != responseCode && 202 == responseCode) {
Endpoint ep = exchange.getEndpoint();
if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo().
getProperty("org.apache.cxf.ws.addressing.MAPAggregator.decoupledDestination")) {
return null;
}
}
// Wait for a response if we need to
if (oi != null && !oi.getOperationInfo().isOneWay()) {
waitResponse(exchange);
}
// leave the input stream open for the caller
Boolean keepConduitAlive = (Boolean)exchange.get(Client.KEEP_CONDUIT_ALIVE);
if (keepConduitAlive == null || !keepConduitAlive) {
completeExchange(exchange);
}
// Grab the response objects if there are any
List<Object> resList = null;
Message inMsg = exchange.getInMessage();
if (inMsg != null) {
if (null != resContext) {
resContext.putAll(inMsg);
// remove the recursive reference if present
resContext.remove(Message.INVOCATION_CONTEXT);
setResponseContext(resContext);
}
resList = CastUtils.cast(inMsg.getContent(List.class));
}
// check for an incoming fault
ex = getException(exchange);
if (ex != null) {
throw ex;
}
if (resList == null
&& oi != null && !oi.getOperationInfo().isOneWay()) {
BindingOperationInfo boi = oi;
if (boi.isUnwrapped()) {
boi = boi.getWrappedOperation();
}
if (!boi.getOutput().getMessageParts().isEmpty()) {
//we were supposed to get some output, but didn't.
throw new IllegalEmptyResponseException("Response message did not contain proper response data."
+ " Expected: " + boi.getOutput().getMessageParts().get(0).getConcreteName());
}
}
if (resList != null) {
return resList.toArray();
}
return null;
}
protected Exception getException(Exchange exchange) {
if (exchange.getInFaultMessage() != null) {
return exchange.getInFaultMessage().getContent(Exception.class);
} else if (exchange.getOutFaultMessage() != null) {
return exchange.getOutFaultMessage().getContent(Exception.class);
} else if (exchange.getInMessage() != null) {
return exchange.getInMessage().getContent(Exception.class);
}
return null;
}
protected void setContext(Map<String, Object> ctx, Message message) {
if (ctx != null) {
message.putAll(ctx);
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("set requestContext to message be" + ctx);
}
}
}
protected void waitResponse(Exchange exchange) throws IOException {
synchronized (exchange) {
long remaining = synchronousTimeout;
Long o = PropertyUtils.getLong(exchange.getOutMessage(), SYNC_TIMEOUT);
if (o != null) {
remaining = o;
}
while (!Boolean.TRUE.equals(exchange.get(FINISHED)) && remaining > 0) {
long start = System.currentTimeMillis();
try {
exchange.wait(remaining);
} catch (InterruptedException ex) {
// ignore
}
long end = System.currentTimeMillis();
remaining -= (int)(end - start);
}
if (!Boolean.TRUE.equals(exchange.get(FINISHED))) {
LogUtils.log(LOG, Level.WARNING, "RESPONSE_TIMEOUT",
exchange.getBindingOperationInfo().getOperationInfo().getName().toString());
String msg = new org.apache.cxf.common.i18n.Message("RESPONSE_TIMEOUT", LOG, exchange
.getBindingOperationInfo().getOperationInfo().getName().toString()).toString();
throw new IOException(msg);
}
}
}
protected void setParameters(Object[] params, Message message) {
MessageContentsList contents = new MessageContentsList(params);
message.setContent(List.class, contents);
}
public void onMessage(Message message) {
if (bus == null) {
throw new IllegalStateException("Message received on a Client that has been closed or destroyed.");
}
Endpoint endpoint = message.getExchange().getEndpoint();
if (endpoint == null) {
// in this case correlation will occur outside the transport,
// however there's a possibility that the endpoint may have been
// rebased in the meantime, so that the response will be mediated
// via a set of in interceptors provided by a *different* endpoint
//
endpoint = getConduitSelector().getEndpoint();
message.getExchange().put(Endpoint.class, endpoint);
}
message = endpoint.getBinding().createMessage(message);
message.getExchange().setInMessage(message);
message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
PhaseManager pm = bus.getExtension(PhaseManager.class);
List<Interceptor<? extends Message>> i1 = bus.getInInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by bus: " + i1);
}
List<Interceptor<? extends Message>> i2 = getInInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by client: " + i2);
}
List<Interceptor<? extends Message>> i3 = endpoint.getInInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by endpoint: " + i3);
}
List<Interceptor<? extends Message>> i4 = endpoint.getBinding().getInInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by binding: " + i4);
}
PhaseInterceptorChain chain;
if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) {
InterceptorProvider p = (InterceptorProvider)endpoint.getService().getDataBinding();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by databinding: " + p.getInInterceptors());
}
chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4,
p.getInInterceptors());
} else {
chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4);
}
message.setInterceptorChain(chain);
chain.setFaultObserver(outFaultObserver);
modifyChain(chain, message, true);
modifyChain(chain, message.getExchange().getOutMessage(), true);
Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
// execute chain
ClientCallback callback = message.getExchange().get(ClientCallback.class);
try {
if (callback != null) {
if (callback.isCancelled()) {
completeExchange(message.getExchange());
return;
}
callback.start(message);
}
String startingAfterInterceptorID = (String) message.get(
InterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
String startingInterceptorID = (String) message.get(
InterceptorChain.STARTING_AT_INTERCEPTOR_ID);
if (startingAfterInterceptorID != null) {
chain.doInterceptStartingAfter(message, startingAfterInterceptorID);
} else if (startingInterceptorID != null) {
chain.doInterceptStartingAt(message, startingInterceptorID);
} else if (message.getContent(Exception.class) != null) {
outFaultObserver.onMessage(message);
} else {
callback = message.getExchange().get(ClientCallback.class);
if (callback != null && !isPartialResponse(message)) {
try {
chain.doIntercept(message);
} catch (Throwable error) {
//so that asyn callback handler get chance to
//handle non-runtime exceptions
message.getExchange().setInMessage(message);
Map<String, Object> resCtx = CastUtils
.cast((Map<?, ?>) message.getExchange()
.getOutMessage().get(
Message.INVOCATION_CONTEXT));
resCtx = CastUtils.cast((Map<?, ?>) resCtx
.get(RESPONSE_CONTEXT));
if (resCtx != null) {
setResponseContext(resCtx);
}
// remove callback so that it won't be invoked twice
callback = message.getExchange().remove(ClientCallback.class);
if (callback != null) {
callback.handleException(resCtx, error);
}
}
} else {
chain.doIntercept(message);
}
}
callback = message.getExchange().get(ClientCallback.class);
if (callback == null || isPartialResponse(message)) {
return;
}
// remove callback so that it won't be invoked twice
callback = message.getExchange().remove(ClientCallback.class);
if (callback != null) {
message.getExchange().setInMessage(message);
Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
.getExchange()
.getOutMessage()
.get(Message.INVOCATION_CONTEXT));
resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
if (resCtx != null && responseContext != null) {
setResponseContext(resCtx);
}
try {
Object[] obj = processResult(message, message.getExchange(),
null, resCtx);
callback.handleResponse(resCtx, obj);
} catch (Throwable ex) {
callback.handleException(resCtx, ex);
}
}
} finally {
if (origBus != bus) {
BusFactory.setThreadDefaultBus(origBus);
}
synchronized (message.getExchange()) {
if (!isPartialResponse(message)
|| message.getContent(Exception.class) != null) {
message.getExchange().put(FINISHED, Boolean.TRUE);
message.getExchange().setInMessage(message);
message.getExchange().notifyAll();
}
}
}
}
public Conduit getConduit() {
Message message = new MessageImpl();
Exchange exchange = new ExchangeImpl();
message.setExchange(exchange);
message.putAll(getRequestContext());
setExchangeProperties(exchange, getEndpoint(), null);
return getConduitSelector().selectConduit(message);
}
protected void prepareConduitSelector(Message message) {
getConduitSelector().prepare(message);
message.getExchange().put(ConduitSelector.class, getConduitSelector());
}
protected void setOutMessageProperties(Message message, BindingOperationInfo boi) {
message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
message.put(Message.INBOUND_MESSAGE, Boolean.FALSE);
if (null != boi) {
message.put(BindingMessageInfo.class, boi.getInput());
message.put(MessageInfo.class, boi.getOperationInfo().getInput());
}
}
protected void setExchangeProperties(Exchange exchange,
Endpoint endpoint,
BindingOperationInfo boi) {
if (endpoint != null) {
exchange.put(Endpoint.class, endpoint);
exchange.put(Service.class, endpoint.getService());
exchange.put(Binding.class, endpoint.getBinding());
}
if (boi != null) {
exchange.put(BindingOperationInfo.class, boi);
}
if (exchange.isSynchronous() || executor == null) {
exchange.put(MessageObserver.class, this);
} else {
exchange.put(Executor.class, executor);
exchange.put(MessageObserver.class, new MessageObserver() {
public void onMessage(final Message message) {
if (!message.getExchange()
.containsKey(Executor.class.getName() + ".USING_SPECIFIED")) {
executor.execute(new Runnable() {
public void run() {
ClientImpl.this.onMessage(message);
}
});
} else {
ClientImpl.this.onMessage(message);
}
}
});
}
exchange.put(Retryable.class, this);
exchange.put(Client.class, this);
exchange.put(Bus.class, bus);
if (endpoint != null) {
EndpointInfo endpointInfo = endpoint.getEndpointInfo();
if (boi != null) {
exchange.put(Message.WSDL_OPERATION, boi.getName());
}
QName serviceQName = endpointInfo.getService().getName();
exchange.put(Message.WSDL_SERVICE, serviceQName);
QName interfaceQName = endpointInfo.getService().getInterface().getName();
exchange.put(Message.WSDL_INTERFACE, interfaceQName);
QName portQName = endpointInfo.getName();
exchange.put(Message.WSDL_PORT, portQName);
URI wsdlDescription = endpointInfo.getProperty("URI", URI.class);
if (wsdlDescription == null) {
String address = endpointInfo.getAddress();
try {
wsdlDescription = new URI(address + "?wsdl");
} catch (URISyntaxException e) {
// do nothing
}
endpointInfo.setProperty("URI", wsdlDescription);
}
exchange.put(Message.WSDL_DESCRIPTION, wsdlDescription);
}
}
protected PhaseInterceptorChain setupInterceptorChain(Endpoint endpoint) {
PhaseManager pm = bus.getExtension(PhaseManager.class);
List<Interceptor<? extends Message>> i1 = bus.getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by bus: " + i1);
}
List<Interceptor<? extends Message>> i2 = getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by client: " + i2);
}
List<Interceptor<? extends Message>> i3 = endpoint.getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by endpoint: " + i3);
}
List<Interceptor<? extends Message>> i4 = endpoint.getBinding().getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by binding: " + i4);
}
List<Interceptor<? extends Message>> i5 = null;
if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) {
i5 = ((InterceptorProvider)endpoint.getService().getDataBinding()).getOutInterceptors();
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Interceptors contributed by databinding: " + i5);
}
}
if (i5 != null) {
return outboundChainCache.get(pm.getOutPhases(), i1, i2, i3, i4, i5);
}
return outboundChainCache.get(pm.getOutPhases(), i1, i2, i3, i4);
}
protected void modifyChain(InterceptorChain chain, Message ctx, boolean in) {
if (ctx == null) {
return;
}
Collection<InterceptorProvider> providers
= CastUtils.cast((Collection<?>)ctx.get(Message.INTERCEPTOR_PROVIDERS));
if (providers != null) {
for (InterceptorProvider p : providers) {
if (in) {
chain.add(p.getInInterceptors());
} else {
chain.add(p.getOutInterceptors());
}
}
}
String key = in ? Message.IN_INTERCEPTORS : Message.OUT_INTERCEPTORS;
Collection<Interceptor<? extends Message>> is
= CastUtils.cast((Collection<?>)ctx.get(key));
if (is != null) {
chain.add(is);
}
}
protected void setEndpoint(Endpoint e) {
getConduitSelector().setEndpoint(e);
}
public int getSynchronousTimeout() {
return synchronousTimeout;
}
public void setSynchronousTimeout(int synchronousTimeout) {
this.synchronousTimeout = synchronousTimeout;
}
public final ConduitSelector getConduitSelector() {
return getConduitSelector(null);
}
protected final ConduitSelector getConduitSelector(
ConduitSelector override
) {
if (null == conduitSelector) {
setConduitSelector(override);
}
return conduitSelector;
}
public final synchronized void setConduitSelector(ConduitSelector selector) {
conduitSelector = selector == null ? new UpfrontConduitSelector() : selector;
}
private boolean isPartialResponse(Message in) {
return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE));
}
@Override
public void close() throws Exception {
destroy();
}
public class EchoContext extends ConcurrentHashMap<String, Object> {
private static final long serialVersionUID = 1L;
public EchoContext(Map<String, Object> sharedMap) {
super(8, 0.75f, 4);
putAll(sharedMap);
}
public void reload() {
super.clear();
super.putAll(requestContext.get(latestContextThread));
}
@Override
public void clear() {
super.clear();
try {
for (Map.Entry<Thread, EchoContext> ent : requestContext.entrySet()) {
if (ent.getValue() == this) {
requestContext.remove(ent.getKey());
return;
}
}
} catch (Throwable t) {
//ignore
}
}
}
/**
* Class to handle the response contexts. The clear is overloaded to remove
* this context from the threadLocal caches in the ClientImpl
*/
static class ResponseContext implements Map<String, Object>, Serializable {
private static final long serialVersionUID = 2L;
final Map<String, Object> wrapped;
final Map<Thread, ResponseContext> responseContext;
ResponseContext(Map<String, Object> origMap, Map<Thread, ResponseContext> rc) {
wrapped = origMap;
responseContext = rc;
}
ResponseContext(Map<Thread, ResponseContext> rc) {
wrapped = new HashMap<>();
responseContext = rc;
}
@Override
public void clear() {
wrapped.clear();
try {
for (Map.Entry<Thread, ResponseContext> ent : responseContext.entrySet()) {
if (ent.getValue() == this) {
responseContext.remove(ent.getKey());
return;
}
}
} catch (Throwable t) {
//ignore
}
}
@Override
public int size() {
return wrapped.size();
}
@Override
public boolean isEmpty() {
return wrapped.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return wrapped.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return wrapped.containsKey(value);
}
@Override
public Object get(Object key) {
return wrapped.get(key);
}
@Override
public Object put(String key, Object value) {
return wrapped.put(key, value);
}
@Override
public Object remove(Object key) {
return wrapped.remove(key);
}
@Override
public void putAll(Map<? extends String, ? extends Object> m) {
wrapped.putAll(m);
}
@Override
public Set<String> keySet() {
return wrapped.keySet();
}
@Override
public Collection<Object> values() {
return wrapped.values();
}
@Override
public Set<Entry<String, Object>> entrySet() {
return wrapped.entrySet();
}
}
public void setExecutor(Executor executor) {
if (!SynchronousExecutor.isA(executor)) {
this.executor = executor;
}
}
public class IllegalEmptyResponseException extends IllegalStateException {
private static final long serialVersionUID = 1L;
public IllegalEmptyResponseException() {
super();
}
public IllegalEmptyResponseException(String message) {
super(message);
}
public IllegalEmptyResponseException(String message, Throwable cause) {
super(message, cause);
}
public IllegalEmptyResponseException(Throwable cause) {
super(cause);
}
}
}