| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.ode.bpel.engine.replayer; |
| |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| |
| import javax.xml.namespace.QName; |
| import javax.xml.xquery.XQConnection; |
| import javax.xml.xquery.XQConstants; |
| import javax.xml.xquery.XQDataSource; |
| import javax.xml.xquery.XQPreparedExpression; |
| import javax.xml.xquery.XQResultSequence; |
| |
| import net.sf.saxon.xqj.SaxonXQConnection; |
| import net.sf.saxon.xqj.SaxonXQDataSource; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.ode.bpel.dao.ProcessDAO; |
| import org.apache.ode.bpel.dao.ProcessInstanceDAO; |
| import org.apache.ode.bpel.engine.BpelEngineImpl; |
| import org.apache.ode.bpel.engine.BpelProcess; |
| import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl; |
| import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl; |
| import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo; |
| import org.apache.ode.bpel.iapi.MessageExchange.Status; |
| import org.apache.ode.bpel.pmapi.CommunicationType; |
| import org.apache.ode.bpel.pmapi.ExchangeType; |
| import org.apache.ode.bpel.pmapi.MockQueryRequestDocument; |
| import org.apache.ode.bpel.pmapi.MockQueryResponseDocument; |
| import org.apache.ode.bpel.pmapi.ResponseType; |
| import org.apache.ode.bpel.pmapi.CommunicationType.Exchange; |
| import org.apache.ode.bpel.pmapi.CommunicationType.ServiceConfig; |
| import org.apache.ode.bpel.runtime.PROCESS; |
| import org.apache.ode.utils.DOMUtils; |
| import org.apache.xmlbeans.XmlAnySimpleType; |
| import org.apache.xmlbeans.XmlObject; |
| import org.apache.xmlbeans.XmlOptions; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| |
| /** |
| * Context holding replayer state (eg. invoke answers) for single instance during replaying. |
| * |
| * @author Rafal Rusin |
| * |
| */ |
| public class ReplayerContext { |
| private static final Log __log = LogFactory.getLog(ReplayerContext.class); |
| |
| public ReplayerScheduler scheduler; |
| |
| public BpelEngineImpl bpelEngine; |
| public ReplayerBpelRuntimeContextImpl runtimeContext; |
| |
| public Map<QName, ServiceConfig> servicesConfig = new HashMap<QName, ServiceConfig>(); |
| public CommunicationType replayerConfig; |
| |
| public final Date replayStartDate; |
| |
| public Answers answers = new Answers(); |
| |
| public class Answers { |
| public Map<String, AnswersForKey> answersMap = new HashMap<String, AnswersForKey>(); |
| |
| public String getAnswersKey(QName service, String operation) { |
| return service.toString() + ";" + operation; |
| } |
| |
| public void add(Exchange e) { |
| ServiceConfig cfg = getServiceConfig(e.getService()); |
| if (cfg.getReplayType().isSetMock()) { |
| String key = getAnswersKey(e.getService(), e.getOperation()); |
| AnswersForKey v = answersMap.get(key); |
| if (v == null) { |
| v = new AnswersForKey(); |
| answersMap.put(key, v); |
| } |
| v.answers.add(e); |
| } |
| } |
| |
| public AnswerResult fetchAnswer(QName service, String operation, Element outgoingMessage, Date currentEventDateTime) { |
| __log.debug("fetching answer for " + service + " " + operation); |
| |
| ServiceConfig cfg = getServiceConfig(service); |
| |
| if (cfg.getReplayType().isSetMock()) { |
| String key = getAnswersKey(service, operation); |
| AnswersForKey v = answersMap.get(key); |
| Exchange e = v == null ? null : v.answerPos < v.answers.size() ? v.answers.get(v.answerPos) : null; |
| if (e == null) { |
| throw new IllegalStateException("answer for " + service + " " + operation + " at time " + currentEventDateTime + " not found, outgoing message was " + DOMUtils.domToString(outgoingMessage)); |
| } |
| v.answerPos++; |
| __log.debug("fetched " + e); |
| return new AnswerResult(false, e); |
| } else if (cfg.getReplayType().isSetMockQuery()) { |
| return new AnswerResult(false, fetchMockQuery(service, operation, outgoingMessage, cfg)); |
| } else if (cfg.getReplayType().isSetLive()) { |
| return new AnswerResult(true, null); |
| } else assert(false); |
| return null; |
| } |
| |
| public void remainingExchanges(List<Exchange> e) { |
| for (AnswersForKey v : answersMap.values()) { |
| v.remainingExchanges(e); |
| } |
| } |
| |
| private Exchange fetchMockQuery(QName service, String operation, Element outgoingMessage, org.apache.ode.bpel.pmapi.CommunicationType.ServiceConfig serviceConfig) { |
| try { |
| MockQueryRequestDocument request = MockQueryRequestDocument.Factory.newInstance(); |
| request.addNewMockQueryRequest().addNewIn().set(XmlObject.Factory.parse(outgoingMessage)); |
| String xquery = serviceConfig.getReplayType().getMockQuery(); |
| |
| XQDataSource xqds = new SaxonXQDataSource(); |
| XQConnection xqconn = xqds.getConnection(); |
| |
| net.sf.saxon.Configuration configuration = ((SaxonXQConnection) xqconn).getConfiguration(); |
| configuration.setHostLanguage(net.sf.saxon.Configuration.XQUERY); |
| // XQStaticContext staticEnv = xqconn.getStaticContext(); |
| XQPreparedExpression exp = xqconn.prepareExpression(xquery); |
| Node requestNode = DOMUtils.parse(request.newXMLStreamReader()); |
| if (__log.isDebugEnabled()) { |
| __log.debug("request " + request.toString()); |
| } |
| exp.bindItem(XQConstants.CONTEXT_ITEM, xqconn.createItemFromNode(requestNode, xqconn.createNodeType())); |
| XQResultSequence result = exp.executeQuery(); |
| MockQueryResponseDocument response = MockQueryResponseDocument.Factory.parse(result.getSequenceAsStream()); |
| { |
| XmlOptions opts = new XmlOptions(); |
| List<Object> errors = new ArrayList<Object>(); |
| opts.setErrorListener(errors); |
| if (!response.validate(opts)) { |
| __log.error("MockQuery response doesn't validate. Errors: " + errors + " Request: " + request.toString() + " Response: " + response.toString(), new Exception()); |
| throw new IllegalStateException("MockQuery response doesn't validate."); |
| } |
| } |
| ResponseType response2 = response.getMockQueryResponse(); |
| |
| if (__log.isDebugEnabled()) { |
| __log.debug("mockQuery result " + response); |
| } |
| |
| |
| Exchange answer = Exchange.Factory.newInstance(); |
| { |
| if (response2.isSetOut()) { |
| answer.setOut(response2.getOut()); |
| } |
| if (response2.isSetFault()) { |
| answer.setFault(response2.getFault()); |
| } |
| if (response2.isSetFailure()) { |
| answer.setFailure(response2.getFailure()); |
| } |
| } |
| |
| return answer; |
| } catch (Exception e) { |
| __log.error("", e); |
| __log.error(e.getCause()); |
| throw new IllegalStateException(e.getMessage()); |
| } |
| } |
| } |
| |
| public static class AnswersForKey { |
| List<Exchange> answers = new ArrayList<Exchange>(); |
| int answerPos = 0; |
| |
| public boolean isCompleted() { |
| return !(answerPos < answers.size()); |
| } |
| |
| public void remainingExchanges(List<Exchange> e) { |
| for (int i = answerPos; i < answers.size(); i++) { |
| e.add(answers.get(i)); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return new Integer(answerPos).toString() + " / " + answers.size(); |
| } |
| } |
| |
| private void scheduleInvoke(final Exchange e, final MyRoleMessageExchangeImpl mex) { |
| final Date time = e.getCreateTime().getTime(); |
| scheduler.scheduleReplayerJob(new Callable<Void>() { |
| public Void call() throws Exception { |
| __log.debug("call " + e); |
| mex.getDAO().setStatus(Status.ASYNC.toString()); |
| runtimeContext.handleIncomingRequest(mex, time); |
| return null; |
| } |
| }, time, runtimeContext); |
| } |
| |
| public void init(final CommunicationType r, ReplayerScheduler scheduler) throws Exception { |
| this.scheduler = scheduler; |
| |
| replayerConfig = r; |
| |
| for (ServiceConfig s : r.getServiceConfigArray()) { |
| servicesConfig.put(s.getService(), s); |
| } |
| |
| final Exchange[] exchanges = r.getExchangeArray(); |
| |
| for (Exchange e : exchanges) { |
| // We skip failures, because INVOKE_CHECK job is not handled by replayer |
| if (e.getType() == ExchangeType.P && !e.isSetFailure()) { |
| answers.add(e); |
| } |
| } |
| |
| { |
| final Exchange e = exchanges[0]; |
| |
| final Date time = e.getCreateTime().getTime(); |
| scheduler.scheduleReplayerJob(new Callable<Void>() { |
| public Void call() throws Exception { |
| __log.debug("initial call " + e); |
| |
| final BpelProcess p = bpelEngine.getNewestProcessByType(r.getProcessType()); |
| final ProcessDAO processDAO = p.getProcessDAO(); |
| final MyRoleMessageExchangeImpl mex = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine); |
| |
| p.invokeProcess(mex, |
| // time, |
| new BpelProcess.InvokeHandler() { |
| public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) { |
| if (routing.messageRoute == null && createInstance) { |
| ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator); |
| |
| runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex, |
| // time, |
| ReplayerContext.this); |
| runtimeContext.setCurrentEventDateTime(time); |
| runtimeContext.updateMyRoleMex(mex); |
| // first receive is matched to provided |
| // mex |
| runtimeContext.execute(); |
| return true; |
| } else if (routing.messageRoute != null) { |
| throw new IllegalStateException("Instantiating mex causes invocation of existing instance " + mex); |
| } |
| return false; |
| } |
| }, true); |
| |
| for (Exchange e : exchanges) { |
| if (e.getType() == ExchangeType.M) { |
| MyRoleMessageExchangeImpl mex2 = ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine); |
| runtimeContext.updateMyRoleMex(mex2); |
| scheduleInvoke(e, mex2); |
| } |
| } |
| return null; |
| } |
| }, time, null); |
| } |
| |
| } |
| |
| public ReplayerContext(Date replayStartDate) { |
| super(); |
| this.replayStartDate = replayStartDate; |
| } |
| |
| public ServiceConfig getServiceConfig(QName service) { |
| ServiceConfig c = servicesConfig.get(service); |
| if (c == null) { |
| c = ServiceConfig.Factory.newInstance(); |
| c.setService(service); |
| c.addNewReplayType().setMock(XmlAnySimpleType.Factory.newInstance()); |
| return c; |
| } else return c; |
| } |
| |
| public void checkRollbackOnFault() { |
| if (replayerConfig.getRollbackOnFault()) { |
| RuntimeException e = new RuntimeException("Process instance run into fault."); |
| if (__log.isDebugEnabled()) { |
| __log.debug("", e); |
| } |
| throw e; |
| } |
| } |
| |
| public static class AnswerResult { |
| public final boolean isLive; |
| public final Exchange e; |
| public AnswerResult(boolean isLive, Exchange e) { |
| super(); |
| this.isLive = isLive; |
| this.e = e; |
| } |
| } |
| } |