blob: 699b7521f70aeb5cc2b4ee26ac1163355822e0fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.msgmap.MessageTranslationException;
import org.w3c.dom.Element;
import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
import java.util.HashMap;
import java.util.Map;
/**
* Bridge JBI (consumer) to ODE (provider).
*/
public class OdeService extends ServiceBridge implements JbiMessageExchangeProcessor {
private static final Logger __log = LoggerFactory.getLogger(OdeService.class);
/** utility for tracking outstanding JBI message exchanges. */
private final JbiMexTracker _jbiMexTracker = new JbiMexTracker();
/** JBI-Generated Endpoint */
private ServiceEndpoint _internal;
/** External endpoint. */
private ServiceEndpoint _external;
private OdeContext _ode;
private Element _serviceref;
private Endpoint _endpoint;
private int count;
public OdeService(OdeContext odeContext, Endpoint endpoint) throws Exception {
_ode = odeContext;
_endpoint = endpoint;
}
public int getCount() {
return count;
}
/**
* Do the JBI endpoint activation.
*
* @throws JBIException
*/
public void activate() throws JBIException {
count++;
if(count != 1)
return;
if (_serviceref == null) {
ServiceEndpoint[] candidates = _ode.getContext().getExternalEndpointsForService(_endpoint.serviceName);
if (candidates.length != 0) {
_external = candidates[0];
}
}
_internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, _endpoint.portName);
if (__log.isDebugEnabled()) {
__log.debug("Activated endpoint " + _endpoint);
}
// TODO: Is there a race situation here?
}
/**
* Deactivate endpoints in JBI.
*/
public void deactivate() throws JBIException {
count--;
if(count != 0)
return;
_ode.getContext().deactivateEndpoint(_internal);
__log.debug("Dectivated endpoint " + _endpoint);
}
public ServiceEndpoint getInternalServiceEndpoint() {
return _internal;
}
public ServiceEndpoint getExternalServiceEndpoint() {
return _external;
}
public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) {
String errmsg = "Message exchange is not in PROVIDER role as expected: " + jbiMex.getExchangeId();
__log.error(errmsg);
throw new IllegalArgumentException(errmsg);
}
if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) {
// We can forget about the exchange.
if (__log.isDebugEnabled()) {
__log.debug("Consuming MEX tracker " + jbiMex.getExchangeId());
}
_jbiMexTracker.consume(jbiMex.getExchangeId());
return;
}
if (jbiMex.getOperation() == null) {
throw new IllegalArgumentException("Null operation in JBI message exchange id=" + jbiMex.getExchangeId()
+ " endpoint=" + _endpoint);
}
if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) {
boolean success = false;
Exception err = null;
try {
invokeOde(jbiMex, ((InOnly) jbiMex).getInMessage());
success = true;
} catch (Exception ex) {
__log.error("Error invoking ODE.", ex);
err = ex;
} finally {
if (!success) {
jbiMex.setStatus(ExchangeStatus.ERROR);
if (err != null && jbiMex.getError() == null)
jbiMex.setError(err);
} else {
if (jbiMex.getStatus() == ExchangeStatus.ACTIVE)
jbiMex.setStatus(ExchangeStatus.DONE);
}
_ode.getChannel().send(jbiMex);
}
} else if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) {
boolean success = false;
Exception err = null;
try {
invokeOde(jbiMex, ((InOut) jbiMex).getInMessage());
success = true;
} catch (Exception ex) {
__log.error("Error invoking ODE.", ex);
err = ex;
} catch (Throwable t) {
__log.error("Unexpected error invoking ODE.", t);
err = new RuntimeException(t);
} finally {
// If we got an error that wasn't sent.
if (jbiMex.getStatus() == ExchangeStatus.ACTIVE && !success) {
if (err != null && jbiMex.getError() == null) {
jbiMex.setError(err);
}
jbiMex.setStatus(ExchangeStatus.ERROR);
_ode.getChannel().send(jbiMex);
}
}
} else {
__log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern "
+ jbiMex.getPattern());
jbiMex.setStatus(ExchangeStatus.ERROR);
jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern()));
}
}
/**
* Called from
* {@link MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}
*
* @param mex
* message exchange
*/
public void onResponse(MyRoleMessageExchange mex) {
final String clientId = mex.getClientId();
final String mexId = mex.getMessageExchangeId();
if (__log.isDebugEnabled()) {
__log.debug("Processing MEX tracker mexId: " + mexId + " clientId: " + clientId);
}
final javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.peek(clientId);
if (jbiMex == null) {
__log.warn("Ignoring unknown async reply. mexId: " + mexId + " clientId: " + clientId);
return;
}
try {
switch (mex.getStatus()) {
case FAULT:
outResponseFault(mex, jbiMex);
break;
case RESPONSE:
outResponse(mex, jbiMex);
break;
case FAILURE:
outFailure(mex, jbiMex);
break;
default:
__log.warn("Received ODE message exchange in unexpected state: " + mex.getStatus() + " mexId: " + mexId + " clientId: " + clientId);
}
mex.release(mex.getStatus() == MessageExchange.Status.RESPONSE);
_ode._scheduler.registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
//Deliver reply to external world only if ODE scheduler's job has completed successfully
try {
_ode.getChannel().send(jbiMex);
if (__log.isDebugEnabled()) {
__log.debug("Consuming MEX tracker mexId: " + mexId + " clientId: " + clientId);
}
_jbiMexTracker.consume(clientId);
} catch (MessagingException e) {
__log.error("Error delivering response from ODE to JBI mexId: " + mexId + " clientId: " + clientId, e);
}
}
}
public void beforeCompletion() {
}
});
} catch (MessagingException e) {
__log.error("Error processing response from ODE to JBI mexId: " + mexId + " clientId: " + clientId, e);
}
}
/**
* Forward a JBI input message to ODE.
*
* @param jbiMex
*/
private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, NormalizedMessage request) throws Exception {
// If this has already been tracked, we will not invoke!
if (_jbiMexTracker.track(jbiMex)) {
if (__log.isDebugEnabled()) {
__log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already received!");
}
return;
}
_ode.getTransactionManager().begin();
boolean success = false;
MyRoleMessageExchange odeMex = null;
try {
if (__log.isDebugEnabled()) {
__log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint
+ " operation=" + jbiMex.getOperation());
}
odeMex = _ode._server.getEngine().createMessageExchange(jbiMex.getExchangeId(), _endpoint.serviceName,
jbiMex.getOperation().getLocalPart());
if (odeMex.getOperation() != null) {
copyMexProperties(odeMex, jbiMex);
javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage();
Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
Mapper mapper = _ode.findMapper(request, odeMex.getOperation());
if (mapper == null) {
String errmsg = "Could not find a mapper for request message for JBI MEX " + jbiMex.getExchangeId()
+ "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed. ";
__log.error(errmsg);
throw new MessageTranslationException(errmsg);
}
odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());
mapper.toODE(odeRequest, request, msgdef);
odeMex.invoke(odeRequest);
// Handle the response if it is immediately available.
if (odeMex.getStatus() != Status.ASYNC) {
if (__log.isDebugEnabled()) {
__log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY.");
}
onResponse(odeMex);
_jbiMexTracker.consume(jbiMex.getExchangeId());
} else {
if (__log.isDebugEnabled()) {
__log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY.");
}
}
} else {
__log.error("ODE MEX " + odeMex + " was unroutable.");
setError(jbiMex, new IllegalArgumentException("Unroutable invocation."));
}
success = true;
// For one-way invocation we do not need to maintain the association
if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) {
if (__log.isDebugEnabled()) {
__log.debug("Consuming non Req/Res MEX tracker " + jbiMex.getExchangeId() + " with pattern " + jbiMex.getPattern());
}
_jbiMexTracker.consume(jbiMex.getExchangeId());
}
} finally {
if (success) {
if (__log.isDebugEnabled()) {
__log.debug("Commiting ODE MEX " + odeMex);
}
_ode.getTransactionManager().commit();
} else {
if (__log.isDebugEnabled()) {
__log.debug("Rolling back ODE MEX " + odeMex);
}
_jbiMexTracker.consume(jbiMex.getExchangeId());
_ode.getTransactionManager().rollback();
}
}
}
private void outFailure(MyRoleMessageExchange odeMex, javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
jbiMex.setError(new Exception("MEXFailure"));
jbiMex.setStatus(ExchangeStatus.ERROR);
// TODO: get failure codes out of the message.
}
private void outResponse(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
InOut inout = (InOut) jbiMex;
try {
NormalizedMessage nmsg = inout.createMessage();
String mapperName = mex.getProperty(Mapper.class.getName());
Mapper mapper = _ode.getMapper(mapperName);
if (mapper == null) {
String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " + mex.getMessageExchangeId()
+ " is no longer available.";
__log.error(errmsg);
throw new MessageTranslationException(errmsg);
}
mapper.toNMS(nmsg, mex.getResponse(), mex.getOperation().getOutput().getMessage(), null);
inout.setOutMessage(nmsg);
} catch (MessageTranslationException e) {
__log.error("Error translating ODE message " + mex.getResponse() + " to NMS format!", e);
setError(jbiMex, e);
}
}
private void outResponseFault(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException {
InOut inout = (InOut) jbiMex;
try {
Fault flt = inout.createFault();
String mapperName = mex.getProperty(Mapper.class.getName());
Mapper mapper = _ode.getMapper(mapperName);
if (mapper == null) {
String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " + mex.getMessageExchangeId()
+ " is no longer available.";
__log.error(errmsg);
throw new MessageTranslationException(errmsg);
}
QName fault = mex.getFault();
javax.wsdl.Fault wsdlFault = mex.getOperation().getFault(fault.getLocalPart());
if (wsdlFault == null) {
setError(jbiMex, new MessageTranslationException("Unmapped Fault : " + fault + ": " + mex.getFaultExplanation()));
} else {
mapper.toNMS(flt, mex.getFaultResponse(), wsdlFault.getMessage(), fault);
inout.setFault(flt);
}
} catch (MessageTranslationException mte) {
__log.error("Error translating ODE fault message " + mex.getFaultResponse() + " to NMS format!", mte);
setError(jbiMex, mte);
}
}
private void setError(javax.jbi.messaging.MessageExchange jbiMex, Exception error) throws MessagingException {
jbiMex.setError(error);
jbiMex.setStatus(ExchangeStatus.ERROR);
}
public Endpoint getEndpoint() {
return _endpoint;
}
/**
* Class for tracking outstanding message exchanges from JBI.
*/
private static class JbiMexTracker {
/**
* Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI
* MEX
*/
private Map<String, javax.jbi.messaging.MessageExchange> _outstandingJbiExchanges = new HashMap<String, javax.jbi.messaging.MessageExchange>();
synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) {
boolean found = _outstandingJbiExchanges.containsKey(jbiMex.getExchangeId());
_outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex);
return found;
}
synchronized javax.jbi.messaging.MessageExchange peek(String clientId) {
return _outstandingJbiExchanges.get(clientId);
}
synchronized javax.jbi.messaging.MessageExchange consume(String clientId) {
return _outstandingJbiExchanges.remove(clientId);
}
}
}