blob: 8f093d62edca73988695dcd5e5543d2fa3569fb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.core.invocation;
import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.context.ThreadMessageContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.InvocationChain;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
import org.apache.tuscany.sca.runtime.Invocable;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.apache.tuscany.sca.work.WorkScheduler;
import org.oasisopen.sca.ServiceRuntimeException;
/**
* Invoker for a endpoint or endpoint reference
* @version $Rev$ $Date$
*/
public class RuntimeInvoker implements Invoker, InvokerAsyncRequest {
protected ExtensionPointRegistry registry;
protected MessageFactory messageFactory;
protected Invocable invocable;
// Run async service invocations using a ThreadPoolExecutor
private ExecutorService theExecutor;
public RuntimeInvoker(ExtensionPointRegistry registry, Invocable invocable) {
this.registry = registry;
this.messageFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(MessageFactory.class);
this.invocable = invocable;
UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class);
WorkScheduler scheduler = utilities.getUtility(WorkScheduler.class);
theExecutor = scheduler.getExecutorService();
}
public Message invokeBinding(Message msg) {
Message context = ThreadMessageContext.setMessageContext(msg);
try {
return invocable.getBindingInvocationChain().getHeadInvoker().invoke(msg);
} finally {
ThreadMessageContext.setMessageContext(context);
}
} // end method invokeBinding
/**
* Async Invoke of the Binding Chain
* @param msg - the message to use in the invocation
*/
public void invokeBindingAsync(Message msg) {
Message context = ThreadMessageContext.setMessageContext(msg);
try {
((InvokerAsyncRequest)invocable.getBindingInvocationChain().getHeadInvoker()).invokeAsyncRequest(msg);
} catch (Throwable t ) {
// TODO - consider what best to do with exception
t.printStackTrace();
} finally {
ThreadMessageContext.setMessageContext(context);
} // end try
} // end method invokeBindingAsync
public Message invoke(Message msg) {
return invoke(msg.getOperation(), msg);
}
public Object invoke(Operation operation, Object[] args) throws InvocationTargetException {
Message msg = messageFactory.createMessage();
msg.setBody(args);
Message resp = invoke(operation, msg);
Object body = resp.getBody();
if (resp.isFault()) {
throw new InvocationTargetException((Throwable)body);
}
return body;
}
public Message invoke(Operation operation, Message msg) {
InvocationChain chain = invocable.getInvocationChain(operation);
return invoke(chain, msg);
}
public Message invoke(InvocationChain chain, Message msg) {
if (invocable instanceof Endpoint) {
msg.setTo((Endpoint)invocable);
} else if (invocable instanceof EndpointReference) {
msg.setFrom((EndpointReference)invocable);
}
Invoker headInvoker = chain.getHeadInvoker();
Operation operation = chain.getTargetOperation();
msg.setOperation(operation);
Message msgContext = ThreadMessageContext.setMessageContext(msg);
try {
return headInvoker.invoke(msg);
} finally {
ThreadMessageContext.setMessageContext(msgContext);
}
}
/**
* Initiate the sending of the forward part of an asynchronous
* exchange along the request part of the wire.
*
* @param msg the request message
*/
public void invokeAsync(Message msg) {
if (invocable instanceof Endpoint) {
Endpoint ep = (Endpoint)invocable;
msg.setTo(ep);
if (!ep.isAsyncInvocation()){
throw new ServiceRuntimeException("Calling invokeAsync on a non-async endpoint - " +
ep);
}
} else if (invocable instanceof EndpointReference) {
RuntimeEndpointReference epr = (RuntimeEndpointReference)invocable;
if (!epr.isAsyncInvocation()){
throw new ServiceRuntimeException("Calling invokeAsync on a non-async endpoint reference - " +
epr);
}
if (epr.isOutOfDate()) {
epr.rebuild();
}
msg.setFrom(epr);
msg.setTo(epr.getTargetEndpoint());
}
Operation operation = msg.getOperation();
InvocationChain chain = invocable.getInvocationChain(operation);
if (chain == null) {
throw new IllegalArgumentException("No matching operation is found: " + operation.getName());
}
// create an async message ID if there isn't one there already
if (!msg.getHeaders().containsKey(Constants.MESSAGE_ID)){
msg.getHeaders().put(Constants.MESSAGE_ID, UUID.randomUUID().toString());UUID.randomUUID().toString();
}
// Perform the async invocation
Invoker headInvoker = chain.getHeadInvoker();
Message msgContext = ThreadMessageContext.setMessageContext(msg);
try {
try {
((InvokerAsyncRequest)headInvoker).invokeAsyncRequest(msg);
} catch (ServiceRuntimeException ex) {
throw ex;
} catch (Throwable ex) {
// temporary fix to swallow the dummy exception that's
// thrown back to get past the response chain processing.
if (!(ex instanceof AsyncResponseException)){
throw new ServiceRuntimeException(ex);
}
}
} finally {
ThreadMessageContext.setMessageContext(msgContext);
}
return;
}
/**
* Initiate the sending of the response part of an asynchronous
* exchange along the response part of the wire.
*
* @param msg the response message
*/
public void invokeAsyncResponse(Message msg) {
InvocationChain chain = invocable.getInvocationChain(msg.getOperation());
Invoker tailInvoker = chain.getTailInvoker();
((InvokerAsyncResponse)tailInvoker).invokeAsyncResponse(msg);
} // end method invokeAsyncResponse
@Override
public void invokeAsyncRequest(Message msg) throws Throwable {
invokeAsync(msg);
} // end method invokeAsyncRequest
}