blob: 424a1fdafbe4b53d3b1f9fc2c938521f4039a5f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine.replayer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.engine.BpelEngineImpl;
import org.apache.ode.bpel.engine.BpelProcess;
import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl;
import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.pmapi.CommunicationType;
import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
import org.apache.ode.bpel.pmapi.ExchangeType;
import org.apache.ode.bpel.pmapi.FaultType;
import org.apache.ode.bpel.pmapi.GetCommunication;
import org.apache.ode.bpel.pmapi.GetCommunicationResponse;
import org.apache.ode.bpel.pmapi.Replay;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.xmlbeans.XmlCalendar;
import org.apache.xmlbeans.XmlException;
import org.apache.xmlbeans.XmlObject;
/**
* Main class used for replaying. It's invoked from InstanceManagement API.
* Receives request and sets up replaying contexts for each instance to replay.
*
* @author Rafal Rusin
*
*/
public class Replayer {
private static final Logger __log = LoggerFactory.getLogger(Replayer.class);
public static ThreadLocal<Replayer> replayer = new ThreadLocal<Replayer>();
public ReplayerScheduler scheduler = new ReplayerScheduler();
public BpelEngineImpl engine = null;
public List<ReplayerContext> contexts = null;
public BpelDAOConnection conn = null;
public List<Long> replayInstances(Replay request, BpelEngine engine, BpelDAOConnection conn) throws Exception {
try {
replayer.set(this);
this.engine = (BpelEngineImpl) engine;
this.conn = conn;
Date startDate = Calendar.getInstance().getTime();
contexts = new ArrayList<ReplayerContext>();
{
List<Long> toDelete = new ArrayList<Long>();
List<CommunicationType> toRestore = new ArrayList<CommunicationType>();
for (Long iid : request.getReplaceInstanceArray()) {
toDelete.add(iid);
}
for (Long iid : request.getUpgradeInstanceArray()) {
toDelete.add(iid);
toRestore.add(CommunicationType.Factory.parse(getCommunication(iid, conn).toString()));
}
toRestore.addAll(Arrays.asList(request.getRestoreInstanceArray()));
{
Set<CLEANUP_CATEGORY> cleanupCategory = new HashSet<CLEANUP_CATEGORY>();
cleanupCategory.add(CLEANUP_CATEGORY.INSTANCE);
cleanupCategory.add(CLEANUP_CATEGORY.MESSAGES);
cleanupCategory.add(CLEANUP_CATEGORY.VARIABLES);
cleanupCategory.add(CLEANUP_CATEGORY.CORRELATIONS);
cleanupCategory.add(CLEANUP_CATEGORY.EVENTS);
for (Long l : toDelete) {
conn.getInstance(l).delete(cleanupCategory);
}
}
for (CommunicationType r : toRestore) {
ReplayerContext context = new ReplayerContext(startDate);
context.bpelEngine = (BpelEngineImpl) engine;
context.init(r, scheduler);
contexts.add(context);
}
}
scheduler.startReplaying(this);
{
List<Exchange> remainingExchanges = new ArrayList<Exchange>();
for (ReplayerContext c : contexts) {
c.answers.remainingExchanges(remainingExchanges);
}
if (remainingExchanges.size() > 0) {
throw new RemainingExchangesException(remainingExchanges);
}
}
List<Long> r = new ArrayList<Long>();
for (ReplayerContext c : contexts) {
r.add(c.runtimeContext.getPid());
}
return r;
} finally {
replayer.set(null);
}
}
public GetCommunicationResponse getCommunication(GetCommunication request, BpelDAOConnection conn) throws Exception {
GetCommunicationResponse response = GetCommunicationResponse.Factory.newInstance();
for (Long iid : request.getIidArray()) {
response.addNewRestoreInstance().set(getCommunication(iid, conn));
}
return response;
}
private CommunicationType getCommunication(Long iid, BpelDAOConnection conn) {
CommunicationType result = CommunicationType.Factory.newInstance();
List<Exchange> list = new ArrayList<Exchange>();
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
return result;
result.setProcessType(instance.getProcess().getType());
for (String mexId : instance.getMessageExchangeIds()) {
MessageExchangeDAO mexDao = conn.getMessageExchange(mexId);
Exchange e = Exchange.Factory.newInstance();
list.add(e);
e.setCreateTime(new XmlCalendar(mexDao.getCreateTime()));
e.setOperation(mexDao.getOperation());
try {
e.setIn(XmlObject.Factory.parse(mexDao.getRequest().getData()));
} catch (XmlException e1) {
__log.error("", e1);
}
try {
Status status = Status.valueOf(mexDao.getStatus());
if (status == Status.FAULT) {
FaultType f = e.addNewFault();
f.setType(mexDao.getFault());
f.setExplanation(mexDao.getFaultExplanation());
if (mexDao.getResponse() != null) {
f.setMessage(XmlObject.Factory.parse(mexDao.getResponse().getData()));
}
} else if (status == Status.FAILURE) {
e.addNewFailure().setExplanation(mexDao.getFaultExplanation());
} else {
if (mexDao.getResponse() != null) {
e.setOut(XmlObject.Factory.parse(mexDao.getResponse().getData()));
}
}
} catch (XmlException e1) {
__log.error("", e1);
}
e.setType(ExchangeType.Enum.forString("" + mexDao.getDirection()));
if (__log.isDebugEnabled()) {
__log.debug("---");
__log.debug("" + mexDao.getCallee());
__log.debug("" + mexDao.getChannel());
__log.debug("" + mexDao.getCreateTime());
__log.debug("" + mexDao.getEPR());
__log.debug("" + mexDao.getPortType());
}
if (e.getType() == ExchangeType.P) {
e.setService(mexDao.getPortType());
} else {
e.setService(mexDao.getCallee());
}
}
Collections.sort(list, new Comparator<Exchange>() {
public int compare(Exchange arg0, Exchange arg1) {
return arg0.getCreateTime().compareTo(arg1.getCreateTime());
}
});
for (Exchange e : list) {
result.addNewExchange().set(e);
}
return result;
}
public ReplayerContext findReplayedInstance(long iid) {
for (ReplayerContext r : contexts) {
if (r.runtimeContext.getPid() == iid) {
return r;
}
}
return null;
}
public void handleJobDetails(JobDetails jobDetail, final Date when) {
JobDetails we = jobDetail;
__log.debug("handleJobDetails " + jobDetail + " " + when);
if (we.getType() == JobType.INVOKE_INTERNAL) {
final BpelProcess p = engine._activeProcesses.get(we.getProcessId());
final ProcessDAO processDAO = p.getProcessDAO();
final MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) engine.getMessageExchange(we.getMexId());
p.invokeProcess(mex,
// time,
new BpelProcess.InvokeHandler() {
public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
if (routing.messageRoute == null && createInstance) {
__log.debug("creating new instance via live communication mex:" + mex);
ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
ReplayerContext context = new ReplayerContext(null);
context.bpelEngine = (BpelEngineImpl) engine;
contexts.add(context);
ReplayerBpelRuntimeContextImpl runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex,
// time,
context);
context.runtimeContext = runtimeContext;
runtimeContext.setCurrentEventDateTime(when);
runtimeContext.updateMyRoleMex(mex);
// first receive is matched to provided
// mex
runtimeContext.execute();
return true;
} else if (routing.messageRoute != null) {
long iid = routing.messageRoute.getTargetInstance().getInstanceId();
ReplayerContext ctx = findReplayedInstance(iid);
if (ctx == null) {
throw new IllegalStateException("Trying to hit existing instance via live communication, but there's no such instance mex:" + mex + " iid:" + iid);
}
__log.debug("hitting existing instance via live communication mex:" + mex + " iid:" + iid);
ctx.runtimeContext.inputMsgMatch(routing.messageRoute.getGroupId(), routing.messageRoute.getIndex(), mex);
routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), ctx.runtimeContext.getDAO());
mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
mex.getDAO().setInstance(routing.messageRoute.getTargetInstance());
ctx.runtimeContext.execute();
}
return false;
}
}, true);
} else if (we.getType() == JobType.INVOKE_RESPONSE) {
__log.debug("reply for live communication");
ReplayerContext ctx = findReplayedInstance(we.getInstanceId());
assert ctx != null;
ctx.runtimeContext.invocationResponse(we.getMexId(), we.getChannel());
ctx.runtimeContext.execute();
}
}
}