blob: ffa3636f78d2b5da1575894961dfc8af6f93e25b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jbi;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import javax.jbi.JBIException;
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.wsdl.Definition;
import javax.wsdl.Operation;
import javax.wsdl.factory.WSDLFactory;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.agents.memory.SizingAgent;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.ProcessAndInstanceManagementImpl;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.obj.OPartnerLink;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.obj.serde.DeSerializer;
import org.apache.ode.bpel.pmapi.InstanceManagement;
import org.apache.ode.bpel.pmapi.ProcessManagement;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.util.WSDLFlattener;
import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
import org.w3c.dom.Document;
/**
* Encapsulation of all the junk needed to get the BPEL engine running.
*
* @author mszefler
*/
final public class OdeContext {
private static final Logger __log = LoggerFactory.getLogger(OdeContext.class);
public static final QName PM_SERVICE_NAME = new QName("http://www.apache.org/ode/pmapi", "ProcessManagementService");
public static final String PM_PORT_NAME = "ProcessManagementPort";
public static final QName IM_SERVICE_NAME = new QName("http://www.apache.org/ode/pmapi", "InstanceManagementService");
public static final String IM_PORT_NAME = "InstanceManagementPort";
/** static singleton */
private static OdeContext __self;
private ComponentContext _context;
private TransactionManager _txm;
private Map<QName, Document> _descriptorCache = new ConcurrentHashMap<QName, Document>();
/** Ordered list of message mappers */
private ArrayList<Mapper> _mappers = new ArrayList<Mapper>();
/** Mapper by class name. */
private Map<String, Mapper> _mappersByClassName = new HashMap<String, Mapper>();
OdeConsumer _consumer;
JbiMessageExchangeProcessor _jbiMessageExchangeProcessor = new JbiMessageExchangeEventRouter(this);
BpelServerImpl _server;
EndpointReferenceContextImpl _eprContext;
MessageExchangeContextImpl _mexContext;
SimpleScheduler _scheduler;
ExecutorService _executorService;
BpelDAOConnectionFactory _daocf;
OdeConfigProperties _config;
DataSource _dataSource;
ProcessStoreImpl _store;
ServiceEndpoint _processManagementEndpoint;
ServiceEndpoint _instanceManagementEndpoint;
JbiMessageExchangeProcessor _processManagementProcessor;
JbiMessageExchangeProcessor _instanceManagementProcessor;
ProcessManagement _processManagement;
InstanceManagement _instanceManagement;
/** Mapping of Endpoint to OdeService */
private Map<Endpoint, OdeService> _activeOdeServices = new ConcurrentHashMap<Endpoint, OdeService>();
private Map<OdeService, EndpointReference> _serviceEprMap = new HashMap<OdeService, EndpointReference>();
/**
* Gets the delivery channel.
*
* @return delivery channel
*/
public DeliveryChannel getChannel() {
DeliveryChannel chnl = null;
if (_context != null) {
try {
chnl = _context.getDeliveryChannel();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return chnl;
}
/**
* Sets the Component context.
*
* @param ctx
* component context.
*/
public void setContext(ComponentContext ctx) {
_context = ctx;
}
public ComponentContext getContext() {
return _context;
}
public static OdeContext getInstance() {
synchronized (OdeContext.class) {
if (__self == null) {
__self = new OdeContext();
}
}
return __self;
}
public void addEndpointDoc(QName svcname, Document df) {
_descriptorCache.put(svcname, df);
}
public Document getServiceDescription(QName svcName) {
return _descriptorCache.get(svcName);
}
public TransactionManager getTransactionManager() {
if (_txm == null) {
return (TransactionManager) getContext().getTransactionManager();
}
return _txm;
}
public void setTransactionManager(TransactionManager txm) {
_txm = txm;
}
public synchronized MyEndpointReference activateEndpoint(QName pid, Endpoint endpoint) throws Exception {
if (__log.isDebugEnabled()) {
__log.debug("Activate endpoint: " + endpoint);
}
OdeService service=_activeOdeServices.get(endpoint);
if(service == null)
service = new OdeService(this, endpoint);
try {
ProcessConf pc = _store.getProcessConfiguration(pid);
File cbpFile = pc.getCBPFile();
OProcess compiledProcess = null;
DeSerializer deserializer = new DeSerializer(cbpFile);
compiledProcess = deserializer.deserialize();
QName portType = null;
for (Map.Entry<String, Endpoint> provide : pc.getProvideEndpoints().entrySet()) {
if (provide.getValue().equals(endpoint)) {
OPartnerLink plink = compiledProcess.getPartnerLink(provide.getKey());
portType = plink.getMyRolePortType().getQName();
break;
}
}
if (portType == null) {
if (__log.isDebugEnabled()) {
__log.debug("Could not find PortType for endpoint");
}
} else {
Definition def = pc.getDefinitionForService(endpoint.serviceName);
if (def == null) {
__log.debug("Could not find definition for service: " + endpoint.serviceName);
} else {
def = new WSDLFlattener(def).getDefinition(portType);
Document doc = WSDLFactory.newInstance().newWSDLWriter().getDocument(def);
addEndpointDoc(endpoint.serviceName, doc);
}
}
} catch (Exception e) {
__log.warn("Exception during endpoint activation", e);
}
MyEndpointReference myepr = new MyEndpointReference(service);
service.activate();
_activeOdeServices.put(endpoint, service);
_serviceEprMap.put(service, myepr);
return myepr;
}
public synchronized void deactivateEndpoint(Endpoint endpoint) throws Exception {
OdeService svc = _activeOdeServices.get(endpoint);
if (svc != null) {
_serviceEprMap.remove(svc);
svc.deactivate();
if(svc.getCount() < 1 ) {
_activeOdeServices.remove(endpoint);
}
}
}
public OdeService getService(Endpoint endpoint) {
return _activeOdeServices.get(endpoint);
}
public OdeService getService(QName serviceName) {
for (Map.Entry<Endpoint,OdeService> e : _activeOdeServices.entrySet()){
if (e.getKey().serviceName.equals(serviceName))
return e.getValue();
}
return null;
}
public Mapper findMapper(NormalizedMessage nmsMsg, Operation op) {
ArrayList<Mapper> maybe = new ArrayList<Mapper>();
for (Mapper m : _mappers) {
Mapper.Recognized result = m.isRecognized(nmsMsg, op);
switch (result) {
case TRUE:
return m;
case FALSE:
continue;
case UNSURE:
maybe.add(m);
break;
}
}
if (maybe.size() == 0)
return null;
if (maybe.size() == 1)
return maybe.get(0);
__log.warn("Multiple mappers may match input message for operation " + op.getName());
// Get the first match.
return maybe.get(0);
}
public Mapper getMapper(String name) {
return _mappersByClassName.get(name);
}
public void registerMapper(Mapper mapper) {
_mappers.add(mapper);
_mappersByClassName.put(mapper.getClass().getName(), mapper);
}
public Mapper getDefaultMapper() {
return _mappers.get(0);
}
void activatePMAPIs() throws JBIException {
ProcessAndInstanceManagementImpl pm = new ProcessAndInstanceManagementImpl(_server, _store);
_processManagement = pm;
_instanceManagement = pm;
_processManagementEndpoint = getContext().activateEndpoint(PM_SERVICE_NAME, PM_PORT_NAME);
_instanceManagementEndpoint = getContext().activateEndpoint(IM_SERVICE_NAME, IM_PORT_NAME);
_processManagementProcessor = new DynamicMessageExchangeProcessor<ProcessManagement>(pm, getChannel());
_instanceManagementProcessor = new DynamicMessageExchangeProcessor<InstanceManagement>(pm, getChannel());
}
void deactivatePMAPIs() throws JBIException {
if (_processManagementEndpoint != null) {
try {
getContext().deactivateEndpoint(_processManagementEndpoint);
} catch (Exception e) {
__log.error("Error deactivating ProcessManagement service", e);
}
}
if (_instanceManagementEndpoint != null) {
try {
getContext().deactivateEndpoint(_instanceManagementEndpoint);
} catch (Exception e) {
__log.error("Error deactivating InstanceManagement service", e);
}
}
}
public long calculateSizeOfService(EndpointReference epr) {
if (epr != null) {
for (OdeService odeService : _serviceEprMap.keySet()) {
EndpointReference serviceEpr = _serviceEprMap.get(odeService);
if (serviceEpr != null && epr.equals(serviceEpr)) {
return SizingAgent.deepSizeOf(odeService);
}
}
}
return 0;
}
}