blob: 7209b0c2a9cfeb3c2c4506392aa1ea200ef98c45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.store;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.wsdl.Definition;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.activityRecovery.FailureHandlingDocument.FailureHandling;
import org.apache.ode.bpel.dd.TCleanup;
import org.apache.ode.bpel.dd.TDeployment;
import org.apache.ode.bpel.dd.TInvoke;
import org.apache.ode.bpel.dd.TMexInterceptor;
import org.apache.ode.bpel.dd.TProcessEvents;
import org.apache.ode.bpel.dd.TProvide;
import org.apache.ode.bpel.dd.TSchedule;
import org.apache.ode.bpel.dd.TScopeEvents;
import org.apache.ode.bpel.dd.TService;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.obj.OFailureHandling;
import org.apache.ode.store.DeploymentUnitDir.CBPInfo;
import org.apache.ode.utils.CollectionUtils;
import org.apache.ode.utils.CronExpression;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.HierarchicalProperties;
import org.apache.ode.utils.WatchDog;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
/**
* Implementation of the {@link org.apache.ode.bpel.iapi.ProcessConf} interface.
* Provides configuration information for a process. Note that this class should
* be immutable, that is the engine expects it to return consistent results!
*
* @author mriou <mriou at apache dot org>
*/
public class ProcessConfImpl implements ProcessConf {
private static final Logger __log = LoggerFactory.getLogger(ProcessConfImpl.class);
private final Date _deployDate;
private File _configDir;
private final Map<QName, Node> _props;
private final HashMap<String, Endpoint> _partnerRoleInitialValues = new HashMap<String, Endpoint>();
private final HashMap<String, PartnerRoleConfig> _partnerRoleConfig = new HashMap<String, PartnerRoleConfig>();
private final HashMap<String, Endpoint> _myRoleEndpoints = new HashMap<String, Endpoint>();
private final ArrayList<QName> _sharedServices = new ArrayList<QName>();
private final Map<String, Set<BpelEvent.TYPE>> _events = new HashMap<String, Set<BpelEvent.TYPE>>();
private final ArrayList<String> _mexi = new ArrayList<String>();
ProcessState _state;
final TDeployment.Process _pinfo;
final DeploymentUnitDir _du;
private long _version = 0;
private QName _pid;
private QName _type;
// cache the inMemory flag because XMLBeans objects are heavily synchronized (guarded by a coarse-grained lock)
private volatile boolean _inMemory = false;
// monitor the IL property file and reload it if necessary
private WatchDog<Map<File, Long>, PropertiesObserver> propertiesWatchDog;
private EndpointReferenceContext eprContext;
private final ProcessCleanupConfImpl processCleanupConfImpl;
private final boolean generateProcessEventsAll;
ProcessConfImpl(QName pid, QName type, long version, DeploymentUnitDir du, TDeployment.Process pinfo, Date deployDate,
Map<QName, Node> props, ProcessState pstate, EndpointReferenceContext eprContext, File configDir, boolean generateProcessEventsAll) {
_pid = pid;
_version = version;
_du = du;
_pinfo = pinfo;
_deployDate = deployDate;
_configDir = configDir;
_props = Collections.unmodifiableMap(props);
_state = pstate;
_type = type;
_inMemory = _pinfo.isSetInMemory() && _pinfo.getInMemory();
this.generateProcessEventsAll = generateProcessEventsAll;
this.eprContext = eprContext;
propertiesWatchDog = new WatchDog<Map<File, Long>, PropertiesObserver>(new PropertiesMutable(), new PropertiesObserver());
initLinks();
initMexInterceptors();
initEventList();
processCleanupConfImpl = new ProcessCleanupConfImpl(pinfo);
initSchedules();
}
private List<File> collectEndpointConfigFiles() {
// please mind the order: process-level files must be before system-level files
List<File> propFiles = new ArrayList<File>();
propFiles.addAll(_du.getEndpointConfigFiles());
if (_configDir == null) {
if (__log.isDebugEnabled()) __log.debug("No config directory set up.");
} else if (_configDir.isDirectory()) {
// list and sort endpoint config files
File[] files = _configDir.listFiles(new FileFilter() {
public boolean accept(File path) {
return path.getName().endsWith(".endpoint") && path.isFile();
}
});
if (files != null) {
Arrays.sort(files);
propFiles.addAll(Arrays.asList(files));
} else {
if (__log.isErrorEnabled()) __log.error(_configDir + " does not exist or is not a directory");
}
} else {
if (__log.isErrorEnabled()) __log.error(_configDir + " does not exist or is not a directory");
}
return propFiles;
}
private void initMexInterceptors() {
if (_pinfo.getMexInterceptors() != null) {
for (TMexInterceptor mexInterceptor : _pinfo.getMexInterceptors().getMexInterceptorArray()) {
_mexi.add(mexInterceptor.getClassName());
}
}
}
private void initLinks() {
if (_pinfo.getInvokeArray() != null) {
for (TInvoke invoke : _pinfo.getInvokeArray()) {
String plinkName = invoke.getPartnerLink();
TService service = invoke.getService();
// NOTE: service can be null for partner links
if (service == null)
continue;
if (__log.isDebugEnabled()) {
__log.debug("Processing <invoke> element for process " + _pinfo.getName() + ": partnerlink " + plinkName + " --> "
+ service);
}
_partnerRoleInitialValues.put(plinkName, new Endpoint(service.getName(), service.getPort()));
{
OFailureHandling g = null;
if (invoke.isSetFailureHandling()) {
FailureHandling f = invoke.getFailureHandling();
g = new OFailureHandling();
if (f.isSetFaultOnFailure()) g.setFaultOnFailure(f.getFaultOnFailure());
if (f.isSetRetryDelay()) g.setRetryDelay(f.getRetryDelay());
if (f.isSetRetryFor()) g.setRetryFor(f.getRetryFor());
}
PartnerRoleConfig c = new PartnerRoleConfig(g, invoke.getUsePeer2Peer());
if (__log.isDebugEnabled()) {
__log.debug("PartnerRoleConfig for " + plinkName + " " + c.failureHandling + " usePeer2Peer: " + c.usePeer2Peer);
}
_partnerRoleConfig.put(plinkName, c);
}
}
}
if (_pinfo.getProvideArray() != null) {
for (TProvide provide : _pinfo.getProvideArray()) {
String plinkName = provide.getPartnerLink();
TService service = provide.getService();
if (service == null) {
String errmsg = "Error in <provide> element for process " + _pinfo.getName() + "; partnerlink " + plinkName
+ "did not identify an endpoint";
__log.error(errmsg);
throw new ContextException(errmsg);
}
if (__log.isDebugEnabled()) {
__log.debug("Processing <provide> element for process " + _pinfo.getName() + ": partnerlink " + plinkName + " --> "
+ service.getName() + " : " + service.getPort());
}
_myRoleEndpoints.put(plinkName, new Endpoint(service.getName(), service.getPort()));
if (provide.isSetEnableSharing()) {
_sharedServices.add(service.getName());
}
}
}
}
public Date getDeployDate() {
return _deployDate;
}
public String getDeployer() {
return "";
}
public List<File> getFiles() {
return _du.allFiles();
}
public QName getProcessId() {
return _pid;
}
public QName getType() {
return _pinfo.getType() == null ? _type : _pinfo.getType();
}
public String getPackage() {
return _du.getName();
}
public Map<QName, Node> getProcessProperties() {
return _props;
}
public long getVersion() {
return _version;
}
public InputStream getCBPInputStream() {
CBPInfo cbpInfo = _du.getCBPInfo(getType());
if (cbpInfo == null)
throw new ContextException("CBP record not found for type " + getType());
try {
return new FileInputStream(cbpInfo.cbp);
} catch (FileNotFoundException e) {
throw new ContextException("File Not Found: " + cbpInfo.cbp, e);
}
}
@Override
public File getCBPFile(){
CBPInfo cbpInfo = _du.getCBPInfo(getType());
if (cbpInfo == null)
throw new ContextException("CBP record not found for type " + getType());
return cbpInfo.cbp;
}
public long getCBPFileSize() {
CBPInfo cbpInfo = _du.getCBPInfo(getType());
if (cbpInfo == null)
throw new ContextException("CBP record not found for type " + getType());
return cbpInfo.cbp.length();
}
public String getBpelDocument() {
CBPInfo cbpInfo = _du.getCBPInfo(getType());
if (cbpInfo == null)
throw new ContextException("CBP record not found for type " + getType());
try {
String relative = getRelativePath(_du.getDeployDir(), cbpInfo.cbp).replaceAll("\\\\", "/");
if (!relative.endsWith(".cbp"))
throw new ContextException("CBP file must end with .cbp suffix: " + cbpInfo.cbp);
relative = relative.replace(".cbp", ".bpel");
File bpelFile = new File(_du.getDeployDir(), relative);
if (!bpelFile.exists()) __log.warn("BPEL file does not exist: " + bpelFile);
return relative;
} catch (IOException e) {
throw new ContextException("IOException in getBpelRelativePath: " + cbpInfo.cbp, e);
}
}
public URI getBaseURI() {
return _du.getDeployDir().toURI();
}
public ProcessState getState() {
return _state;
}
void setState(ProcessState state) {
_state = state;
}
public List<String> getMexInterceptors(QName processId) {
return Collections.unmodifiableList(_mexi);
}
public Definition getDefinitionForService(QName serviceName) {
return _du.getDefinitionForService(serviceName);
}
public Definition getDefinitionForPortType(QName portTypeName) {
return _du.getDefinitionForPortType(portTypeName);
}
public Map<String, Endpoint> getInvokeEndpoints() {
return Collections.unmodifiableMap(_partnerRoleInitialValues);
}
public Map<String, PartnerRoleConfig> getPartnerRoleConfig() {
return Collections.unmodifiableMap(_partnerRoleConfig);
}
public Map<String, Endpoint> getProvideEndpoints() {
return Collections.unmodifiableMap(_myRoleEndpoints);
}
public boolean isSharedService(QName serviceName) {
return _sharedServices.contains(serviceName);
}
@SuppressWarnings("unused")
private void handleEndpoints() {
// for (TProvide provide : _pinfo.getProvideList()) {
// OPartnerLink pLink = _oprocess.getPartnerLink(provide.getPartnerLink());
// if (pLink == null) {
// String msg = __msgs.msgDDPartnerLinkNotFound(provide.getPartnerLink());
// __log.error(msg);
// throw new BpelEngineException(msg);
// }
// if (!pLink.hasMyRole()) {
// String msg = __msgs.msgDDMyRoleNotFound(provide.getPartnerLink());
// __log.error(msg);
// throw new BpelEngineException(msg);
// }
// }
// for (TInvoke invoke : _pinfo.getInvokeList()) {
// OPartnerLink pLink = _oprocess.getPartnerLink(invoke.getPartnerLink());
// if (pLink == null) {
// String msg = __msgs.msgDDPartnerLinkNotFound(invoke.getPartnerLink());
// __log.error(msg);
// throw new BpelEngineException(msg);
// }
// if (!pLink.hasPartnerRole()) {
// String msg = __msgs.msgDDPartnerRoleNotFound(invoke.getPartnerLink());
// __log.error(msg);
// throw new BpelEngineException(msg);
// }
// TODO Handle non initialize partner roles that just provide a binding
// if (!pLink.initializePartnerRole && _oprocess.version.equals(Namespaces.WS_BPEL_20_NS)) {
// String msg = ProcessDDInitializer.__msgs.msgDDNoInitiliazePartnerRole(invoke.getPartnerLink());
// ProcessDDInitializer.__log.error(msg);
// throw new BpelEngineException(msg);
// }
// }
}
DeploymentUnitDir getDeploymentUnit() {
return _du;
}
public boolean isTransient() {
return _inMemory;
}
public void setTransient(boolean t) {
_pinfo.setInMemory(t);
_inMemory = t;
}
public boolean isEventEnabled(List<String> scopeNames, BpelEvent.TYPE type) {
if (scopeNames != null) {
for (String scopeName : scopeNames) {
Set<BpelEvent.TYPE> evtSet = _events.get(scopeName);
if (evtSet != null) {
if (evtSet.contains(type)) return true;
}
}
}
Set<BpelEvent.TYPE> evtSet = _events.get(null);
if (evtSet != null) {
// Default filtering at the process level for some event types
if (evtSet.contains(type)) return true;
}
return false;
}
private void initEventList() {
TProcessEvents processEvents = _pinfo.getProcessEvents();
// No filtering, using defaults
if (processEvents == null) {
if (generateProcessEventsAll) {
HashSet<BpelEvent.TYPE> all = new HashSet<BpelEvent.TYPE>();
for (BpelEvent.TYPE t : BpelEvent.TYPE.values()) {
if (!t.equals(BpelEvent.TYPE.scopeHandling)) all.add(t);
}
_events.put(null, all);
}
return;
}
// Adding all events
if (processEvents.getGenerate() != null && processEvents.getGenerate().equals(TProcessEvents.Generate.ALL)) {
HashSet<BpelEvent.TYPE> all = new HashSet<BpelEvent.TYPE>();
for (BpelEvent.TYPE t : BpelEvent.TYPE.values())
all.add(t);
_events.put(null, all);
return;
}
// Events filtered at the process level
if (processEvents.getEnableEventArray() != null && processEvents.getEnableEventArray().length > 0) {
HashSet<BpelEvent.TYPE> evtSet = new HashSet<BpelEvent.TYPE>();
for (String enEvt : processEvents.getEnableEventArray()) {
evtSet.add(BpelEvent.TYPE.valueOf(enEvt));
}
_events.put(null, evtSet);
}
// Events filtered at the scope level
if (processEvents.getScopeEventsArray() != null) {
for (TScopeEvents tScopeEvents : processEvents.getScopeEventsArray()) {
HashSet<BpelEvent.TYPE> evtSet = new HashSet<BpelEvent.TYPE>();
for (String enEvt : tScopeEvents.getEnableEventArray()) {
evtSet.add(BpelEvent.TYPE.valueOf(enEvt));
}
_events.put(tScopeEvents.getName(), evtSet);
}
}
}
private String getRelativePath(File base, File path) throws IOException {
String basePath = base.getCanonicalPath();
String cbpPath = path.getCanonicalPath();
if (!cbpPath.startsWith(basePath))
throw new IOException("Invalid relative path: base=" + base + " path=" + path);
String relative = cbpPath.substring(basePath.length());
if (relative.startsWith(File.separator)) relative = relative.substring(1);
return relative;
}
public List<Element> getExtensionElement(QName qname) {
try {
return DOMUtils.findChildrenByName(DOMUtils.stringToDOM(_pinfo.toString()), qname);
} catch (Exception e) {
return Collections.emptyList();
}
}
@SuppressWarnings("unchecked")
public Map<String, String> getEndpointProperties(EndpointReference epr) {
final Map map = eprContext.getConfigLookup(epr);
final QName service = (QName) map.get("service");
final String port = (String) map.get("port");
// update properties if necessary
// do it manually to save resources (instead of using a thread)
propertiesWatchDog.check();
final Map prop = propertiesWatchDog.getObserver().get().getProperties(service, port);
if(!prop.isEmpty() && __log.isDebugEnabled()) {
StringBuilder msg = new StringBuilder("Properties for ");
if(service!=null) msg.append("service ").append(service);
if(port!=null) msg.append(", port ").append(port);
msg.append(": {");
for (Iterator it = prop.entrySet().iterator(); it.hasNext();) {
Map.Entry e = (Map.Entry) it.next();
msg.append(e.getKey()).append("=>").append(e.getValue());
if(it.hasNext()) msg.append(", ");
}
msg.append("}");
__log.debug(msg.toString());
}
return prop;
}
private class PropertiesMutable implements WatchDog.Mutable<Map<File, Long>> {
public boolean exists() {
return true;
}
public boolean hasChangedSince(Map<File, Long> since) {
return !CollectionUtils.equals(lastModified(), since);
}
public Map<File, Long> lastModified() {
List<File> files = collectEndpointConfigFiles();
Map<File, Long> m = new HashMap<File, Long>(files.size() * 15 / 10);
for (File f : files) m.put(f, Long.valueOf(f.lastModified()));
return m;
}
@Override
public String toString() {
return "Endpoint files for "+_du.toString();
}
}
private class PropertiesObserver extends WatchDog.DefaultObserver<HierarchicalProperties> {
public void init() {
try {
// do not hold a reference on the file list, so that changes are handled
// and always create a new instance of the HierarchicalProperties
object = new HierarchicalProperties(collectEndpointConfigFiles());
} catch (IOException e) {
throw new ContextException("Integration-Layer Properties cannot be loaded!", e);
}
}
}
public boolean isCleanupCategoryEnabled(boolean instanceSucceeded, CLEANUP_CATEGORY category) {
return processCleanupConfImpl.isCleanupCategoryEnabled(instanceSucceeded, category);
}
public Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded) {
return processCleanupConfImpl.getCleanupCategories(instanceSucceeded);
}
private void initSchedules() {
for(TSchedule schedule : _pinfo.getScheduleArray()) {
for(TCleanup cleanup : schedule.getCleanupArray()) {
assert cleanup.getFilterArray().length > 0;
}
}
}
public List<CronJob> getCronJobs() {
List<CronJob> jobs = new ArrayList<CronJob>();
for(TSchedule schedule : _pinfo.getScheduleArray()) {
CronJob job = new CronJob();
try {
job.setCronExpression(new CronExpression(schedule.getWhen()));
for(TCleanup aCleanup : schedule.getCleanupArray()) {
CleanupInfo cleanupInfo = new CleanupInfo();
assert aCleanup.getFilterArray().length > 0;
cleanupInfo.setFilters(Arrays.asList(aCleanup.getFilterArray()));
ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), Arrays.asList(aCleanup.getCategoryArray()));
JobDetails runnableDetails = new JobDetails();
runnableDetails.getDetailsExt().put("cleanupInfo", cleanupInfo);
runnableDetails.setProcessId(_pid);
runnableDetails.getDetailsExt().put("transactionSize", 10);
job.getRunnableDetailList().add(runnableDetails);
}
jobs.add(job);
} catch( ParseException pe ) {
__log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe);
}
}
return jobs;
}
}