blob: 3447fa0e2506a4e97e7acb4cd601d10c715edcf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine.replayer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.xml.namespace.QName;
import javax.xml.xquery.XQConnection;
import javax.xml.xquery.XQConstants;
import javax.xml.xquery.XQDataSource;
import javax.xml.xquery.XQPreparedExpression;
import javax.xml.xquery.XQResultSequence;
import net.sf.saxon.xqj.SaxonXQConnection;
import net.sf.saxon.xqj.SaxonXQDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.engine.BpelEngineImpl;
import org.apache.ode.bpel.engine.BpelProcess;
import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl;
import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.pmapi.CommunicationType;
import org.apache.ode.bpel.pmapi.ExchangeType;
import org.apache.ode.bpel.pmapi.MockQueryRequestDocument;
import org.apache.ode.bpel.pmapi.MockQueryResponseDocument;
import org.apache.ode.bpel.pmapi.ResponseType;
import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
import org.apache.ode.bpel.pmapi.CommunicationType.ServiceConfig;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.utils.DOMUtils;
import org.apache.xmlbeans.XmlAnySimpleType;
import org.apache.xmlbeans.XmlObject;
import org.apache.xmlbeans.XmlOptions;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
/**
* Context holding replayer state (eg. invoke answers) for single instance during replaying.
*
* @author Rafal Rusin
*
*/
public class ReplayerContext {
private static final Logger __log = LoggerFactory.getLogger(ReplayerContext.class);
public ReplayerScheduler scheduler;
public BpelEngineImpl bpelEngine;
public ReplayerBpelRuntimeContextImpl runtimeContext;
public Map<QName, ServiceConfig> servicesConfig = new HashMap<QName, ServiceConfig>();
public CommunicationType replayerConfig;
public final Date replayStartDate;
public Answers answers = new Answers();
public class Answers {
public Map<String, AnswersForKey> answersMap = new HashMap<String, AnswersForKey>();
public String getAnswersKey(QName service, String operation) {
return service.toString() + ";" + operation;
}
public void add(Exchange e) {
ServiceConfig cfg = getServiceConfig(e.getService());
if (cfg.getReplayType().isSetMock()) {
String key = getAnswersKey(e.getService(), e.getOperation());
AnswersForKey v = answersMap.get(key);
if (v == null) {
v = new AnswersForKey();
answersMap.put(key, v);
}
v.answers.add(e);
}
}
public AnswerResult fetchAnswer(QName service, String operation, Element outgoingMessage, Date currentEventDateTime) {
__log.debug("fetching answer for " + service + " " + operation);
ServiceConfig cfg = getServiceConfig(service);
if (cfg.getReplayType().isSetMock()) {
String key = getAnswersKey(service, operation);
AnswersForKey v = answersMap.get(key);
Exchange e = v == null ? null : v.answerPos < v.answers.size() ? v.answers.get(v.answerPos) : null;
if (e == null) {
throw new IllegalStateException("answer for " + service + " " + operation + " at time " + currentEventDateTime + " not found, outgoing message was " + DOMUtils.domToString(outgoingMessage));
}
v.answerPos++;
__log.debug("fetched " + e);
return new AnswerResult(false, e);
} else if (cfg.getReplayType().isSetMockQuery()) {
return new AnswerResult(false, fetchMockQuery(service, operation, outgoingMessage, cfg));
} else if (cfg.getReplayType().isSetLive()) {
return new AnswerResult(true, null);
} else assert(false);
return null;
}
public void remainingExchanges(List<Exchange> e) {
for (AnswersForKey v : answersMap.values()) {
v.remainingExchanges(e);
}
}
private Exchange fetchMockQuery(QName service, String operation, Element outgoingMessage, org.apache.ode.bpel.pmapi.CommunicationType.ServiceConfig serviceConfig) {
try {
MockQueryRequestDocument request = MockQueryRequestDocument.Factory.newInstance();
request.addNewMockQueryRequest().addNewIn().set(XmlObject.Factory.parse(outgoingMessage));
String xquery = serviceConfig.getReplayType().getMockQuery();
XQDataSource xqds = new SaxonXQDataSource();
XQConnection xqconn = xqds.getConnection();
net.sf.saxon.Configuration configuration = ((SaxonXQConnection) xqconn).getConfiguration();
configuration.setHostLanguage(net.sf.saxon.Configuration.XQUERY);
// XQStaticContext staticEnv = xqconn.getStaticContext();
XQPreparedExpression exp = xqconn.prepareExpression(xquery);
Node requestNode = DOMUtils.parse(request.newXMLStreamReader());
if (__log.isDebugEnabled()) {
__log.debug("request " + request.toString());
}
exp.bindItem(XQConstants.CONTEXT_ITEM, xqconn.createItemFromNode(requestNode, xqconn.createNodeType()));
XQResultSequence result = exp.executeQuery();
MockQueryResponseDocument response = MockQueryResponseDocument.Factory.parse(result.getSequenceAsStream());
{
XmlOptions opts = new XmlOptions();
List<Object> errors = new ArrayList<Object>();
opts.setErrorListener(errors);
if (!response.validate(opts)) {
__log.error("MockQuery response doesn't validate. Errors: " + errors + " Request: " + request.toString() + " Response: " + response.toString(), new Exception());
throw new IllegalStateException("MockQuery response doesn't validate.");
}
}
ResponseType response2 = response.getMockQueryResponse();
if (__log.isDebugEnabled()) {
__log.debug("mockQuery result " + response);
}
Exchange answer = Exchange.Factory.newInstance();
{
if (response2.isSetOut()) {
answer.setOut(response2.getOut());
}
if (response2.isSetFault()) {
answer.setFault(response2.getFault());
}
if (response2.isSetFailure()) {
answer.setFailure(response2.getFailure());
}
}
return answer;
} catch (Exception e) {
__log.error("", e);
throw new IllegalStateException(e.getMessage());
}
}
}
public static class AnswersForKey {
List<Exchange> answers = new ArrayList<Exchange>();
int answerPos = 0;
public boolean isCompleted() {
return !(answerPos < answers.size());
}
public void remainingExchanges(List<Exchange> e) {
for (int i = answerPos; i < answers.size(); i++) {
e.add(answers.get(i));
}
}
@Override
public String toString() {
return new Integer(answerPos).toString() + " / " + answers.size();
}
}
private void scheduleInvoke(final Exchange e, final MyRoleMessageExchangeImpl mex) {
final Date time = e.getCreateTime().getTime();
scheduler.scheduleReplayerJob(new Callable<Void>() {
public Void call() throws Exception {
__log.debug("call " + e);
mex.getDAO().setStatus(Status.ASYNC.toString());
runtimeContext.handleIncomingRequest(mex, time);
return null;
}
}, time, runtimeContext);
}
public void init(final CommunicationType r, ReplayerScheduler scheduler) throws Exception {
this.scheduler = scheduler;
replayerConfig = r;
for (ServiceConfig s : r.getServiceConfigArray()) {
servicesConfig.put(s.getService(), s);
}
final Exchange[] exchanges = r.getExchangeArray();
for (Exchange e : exchanges) {
// We skip failures, because INVOKE_CHECK job is not handled by replayer
if (e.getType() == ExchangeType.P && !e.isSetFailure()) {
answers.add(e);
}
}
{
final Exchange e = exchanges[0];
final Date time = e.getCreateTime().getTime();
scheduler.scheduleReplayerJob(new Callable<Void>() {
public Void call() throws Exception {
__log.debug("initial call " + e);
final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType());
final ProcessDAO processDAO = p.getProcessDAO();
final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
p.invokeProcess(mex,
// time,
new BpelProcess.InvokeHandler() {
public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
if (routing.messageRoute == null && createInstance) {
ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex,
// time,
ReplayerContext.this);
runtimeContext.setCurrentEventDateTime(time);
runtimeContext.updateMyRoleMex(mex);
// first receive is matched to provided
// mex
runtimeContext.execute();
return true;
} else if (routing.messageRoute != null) {
throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex);
}
return false;
}
}, true);
for (Exchange e : exchanges) {
if (e.getType() == ExchangeType.M) {
MyRoleMessageExchangeImpl mex2 = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
runtimeContext.updateMyRoleMex(mex2);
scheduleInvoke(e, mex2);
}
}
return null;
}
}, time, null);
}
}
public ReplayerContext(Date replayStartDate) {
super();
this.replayStartDate = replayStartDate;
}
public ServiceConfig getServiceConfig(QName service) {
ServiceConfig c = servicesConfig.get(service);
if (c == null) {
c = ServiceConfig.Factory.newInstance();
c.setService(service);
c.addNewReplayType().setMock(XmlAnySimpleType.Factory.newInstance());
return c;
} else return c;
}
public void checkRollbackOnFault() {
if (replayerConfig.getRollbackOnFault()) {
RuntimeException e = new RuntimeException("Process instance run into fault.");
if (__log.isDebugEnabled()) {
__log.debug("", e);
}
throw e;
}
}
public static class AnswerResult {
public final boolean isLive;
public final Exchange e;
public AnswerResult(boolean isLive, Exchange e) {
super();
this.isLive = isLive;
this.e = e;
}
}
}