blob: 7f52a5b49e0081958a667c49a32b8d48ba7e2cd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jbi;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.msgmap.MessageTranslationException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import javax.jbi.messaging.*;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE
* that is destined for other JBI providers.
*/
abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
private static final Log __log = LogFactory.getLog(OdeConsumer.class);
private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout", 2 * 60 * 1000L);
protected OdeContext _ode;
protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String, PartnerRoleMessageExchange>();
OdeConsumer(OdeContext ode) {
_ode = ode;
}
/**
* This is where we handle invocation where the ODE BPEL engine is the <em>client</em> and some other JBI service is the
* <em>provider</em>.
*/
public void invokePartner(final PartnerRoleMessageExchange odeMex) throws ContextException {
// Cast the EndpointReference to a JbiEndpointReference. This is the
// only type it can be (since we control the creation of these things).
JbiEndpointReference targetEndpoint = (JbiEndpointReference) odeMex.getEndpointReference();
if (targetEndpoint == null) {
String errmsg = "No endpoint for mex: " + odeMex;
__log.error(errmsg);
odeMex.replyWithFailure(FailureType.INVALID_ENDPOINT, errmsg, null);
return;
}
ServiceEndpoint se = targetEndpoint.getServiceEndpoint();
boolean isTwoWay = odeMex.getMessageExchangePattern() == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
QName opname = new QName(se.getServiceName().getNamespaceURI(), odeMex.getOperation().getName());
MessageExchangeFactory mexf = _ode.getChannel().createExchangeFactory(se);
final MessageExchange jbiMex;
try {
jbiMex = mexf.createExchange(isTwoWay ? MessageExchangePattern.IN_OUT : MessageExchangePattern.IN_ONLY);
jbiMex.setEndpoint(se);
jbiMex.setService(se.getServiceName());
jbiMex.setOperation(opname);
} catch (MessagingException e) {
String errmsg = "Unable to create JBI message exchange for ODE message exchange " + odeMex;
__log.error(errmsg, e);
odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
return;
}
Mapper mapper = _ode.getDefaultMapper();
odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName());
try {
if (!isTwoWay) {
final InOnly inonly = ((InOnly) jbiMex);
NormalizedMessage nmsg = inonly.createMessage();
mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
inonly.setInMessage(nmsg);
_ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
doSendOneWay(odeMex, inonly);
}
}
public void beforeCompletion() {
}
});
odeMex.replyOneWayOk();
} else {
final InOut inout = (InOut) jbiMex;
NormalizedMessage nmsg = inout.createMessage();
mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
inout.setInMessage(nmsg);
_ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
doSendTwoWay(odeMex, inout);
}
}
public void beforeCompletion() {
}
});
odeMex.replyAsync();
}
} catch (MessagingException me) {
String errmsg = "JBI messaging error for ODE MEX " + odeMex;
__log.error(errmsg, me);
odeMex.replyWithFailure(FailureType.COMMUNICATION_ERROR, errmsg, null);
} catch (MessageTranslationException e) {
String errmsg = "Error converting ODE message to JBI format for mex " + odeMex;
__log.error(errmsg, e);
odeMex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
}
}
protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);
protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);
protected abstract void inOutDone(InOut inout);
public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
!jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
__log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern());
return;
}
if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
inOutDone((InOut) jbiMex);
outResponse((InOut) jbiMex);
}
jbiMex.setStatus(ExchangeStatus.DONE);
_ode.getChannel().send(jbiMex);
} else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
inOutDone((InOut) jbiMex);
outFailure((InOut) jbiMex);
} else if (jbiMex.getStatus() == ExchangeStatus.DONE) {
_outstandingExchanges.remove(jbiMex.getExchangeId());
} else {
__log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange: " + jbiMex.getExchangeId());
}
}
private void outFailure(final InOut jbiMex) {
final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId());
if (pmex == null) {
__log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
return;
}
try {
_ode._scheduler.execTransaction(new Callable<Boolean>() {
public Boolean call() throws Exception {
pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
return null;
}
});
} catch (Exception ex) {
__log.error("error delivering failure: ", ex);
}
}
private void outResponse(final InOut jbiMex) {
final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId());
if (outstanding == null) {
__log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
return;
}
try {
_ode._scheduler.execTransaction(new Callable<Boolean>() {
@SuppressWarnings("unchecked")
public Boolean call() throws Exception {
// need to reload mex since we're in a different transaction
PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId());
if (pmex == null) {
__log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
return Boolean.FALSE;
}
String mapperName = pmex.getProperty(Mapper.class.getName());
Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
if (mapper == null) {
String errmsg = "Mapper not found.";
__log.error(errmsg);
pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
} else {
try {
Fault jbiFlt = jbiMex.getFault();
if (jbiFlt != null) {
javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values());
if (wsdlFlt == null) {
pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);
} else {
if (wsdlFlt.getMessage() != null) {
Message faultResponse = pmex.createMessage(wsdlFlt.getMessage().getQName());
mapper.toODE(faultResponse, jbiFlt, wsdlFlt.getMessage());
pmex.replyWithFault(new QName(pmex.getPortType().getQName().getNamespaceURI(), wsdlFlt
.getName()), faultResponse);
} else {
// Can this even happen?
__log.fatal("Internal Error: fault found without a message type: " + wsdlFlt);
pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Fault has no message: "
+ wsdlFlt.getName(), null);
}
}
} else {
Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
pmex.reply(response);
}
} catch (MessageTranslationException mte) {
__log.error("Error translating message.", mte);
pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
}
}
return null;
}
});
} catch (Exception ex) {
__log.error("error delivering RESPONSE: ", ex);
}
}
public void setResponseTimeout(long timeout) {
_responseTimeout = timeout;
}
public long getResponseTimeout() {
return _responseTimeout;
}
}