blob: c767c9445271d981f779ca365fd3e243f08e8b4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.node.impl;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.stream.XMLOutputFactory;
import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.ComponentService;
import org.apache.tuscany.sca.assembly.Composite;
import org.apache.tuscany.sca.assembly.CompositeService;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.Service;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.context.ThreadMessageContext;
import org.apache.tuscany.sca.contribution.Contribution;
import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.core.invocation.ProxyFactory;
import org.apache.tuscany.sca.monitor.Monitor;
import org.apache.tuscany.sca.node.Node;
import org.apache.tuscany.sca.node.configuration.NodeConfiguration;
import org.apache.tuscany.sca.runtime.ActivationException;
import org.apache.tuscany.sca.runtime.CompositeActivator;
import org.apache.tuscany.sca.runtime.DomainRegistryFactory;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeComponentContext;
import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import org.oasisopen.sca.ServiceReference;
import org.oasisopen.sca.ServiceRuntimeException;
import org.oasisopen.sca.ServiceUnavailableException;
/**
* An SCA Node that is managed by the NodeManager
*/
public class NodeImpl implements Node {
private static final Logger logger = Logger.getLogger(NodeImpl.class.getName());
private ProxyFactory proxyFactory;
private CompositeActivator compositeActivator;
private CompositeContext compositeContext;
private Composite domainComposite;
private NodeConfiguration configuration;
private NodeFactoryImpl nodeFactory;
private List<Contribution> contributions;
// private NodeManager mbean;
/**
* Create a node from the configuration
* @param manager
* @param configuration
*/
public NodeImpl(NodeFactoryImpl nodeFactory, NodeConfiguration configuration) {
super();
this.configuration = configuration;
this.nodeFactory = nodeFactory;
}
/**
* Create a node from the configuration and loaded contributions
* @param manager
* @param configuration
* @param contributions
*/
public NodeImpl(NodeFactoryImpl manager, NodeConfiguration configuration, List<Contribution> contributions) {
super();
this.configuration = configuration;
this.nodeFactory = manager;
this.contributions = new ArrayList<Contribution>(contributions);
}
public String getURI() {
return getConfiguration().getURI();
}
public Node start() {
logger.log(Level.INFO, "Starting node: " + configuration.getURI() + " domain: " + configuration.getDomainURI());
nodeFactory.init();
nodeFactory.addNode(configuration, this);
this.proxyFactory = nodeFactory.proxyFactory;
DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(nodeFactory.registry);
EndpointRegistry endpointRegistry =
domainRegistryFactory.getEndpointRegistry(configuration.getDomainRegistryURI(), configuration
.getDomainURI());
UtilityExtensionPoint utilities = nodeFactory.registry.getExtensionPoint(UtilityExtensionPoint.class);
this.compositeActivator = utilities.getUtility(CompositeActivator.class);
try {
Monitor monitor = nodeFactory.monitorFactory.createMonitor();
ProcessorContext context = new ProcessorContext(monitor);
// Set up the thead context monitor
Monitor tcm = nodeFactory.monitorFactory.setContextMonitor(monitor);
try {
if (contributions == null) {
contributions = nodeFactory.loadContributions(configuration, context);
}
domainComposite = nodeFactory.configureNode(configuration, contributions, context);
this.compositeContext =
new CompositeContext(nodeFactory.registry,
endpointRegistry,
domainComposite,
configuration.getDomainURI(),
configuration.getURI(),
nodeFactory.getDeployer().getSystemDefinitions());
CompositeContext.setThreadCompositeContext(compositeContext);
} finally {
// Reset the thread context monitor
nodeFactory.monitorFactory.setContextMonitor(tcm);
}
// Activate the composite
compositeActivator.activate(compositeContext, domainComposite);
// Start the composite
compositeActivator.start(compositeContext, domainComposite);
// FIXME: [rfeng] We should turn the management capability into a system utility.
// In certain environment such as Google App Engine, the JMX API is not allowed
try {
/*
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mbean = new NodeManager(this);
mBeanServer.registerMBean(mbean, mbean.getName());
*/
/*
LocateRegistry.createRegistry(9999);
JMXServiceURL url =
new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9999/server");
JMXConnectorServer connectorServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mBeanServer);
connectorServer.start();
*/
} catch (Throwable e) {
// Ignore the error for now
// mbean = null;
logger.log(Level.SEVERE, e.getMessage(), e);
}
return this;
} catch (Throwable e) {
throw new IllegalStateException(e);
}
}
public void stop() {
logger.log(Level.INFO, "Stopping node: " + configuration.getURI());
try {
if (compositeActivator == null) {
return;
}
/*
if (mbean != null) {
try {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.unregisterMBean(mbean.getName());
} catch (Throwable e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} finally {
mbean = null;
}
}
*/
if( domainComposite != null ) {
// Stop the composite
compositeActivator.stop(compositeContext, domainComposite);
// Deactivate the composite
compositeActivator.deactivate(domainComposite);
} // end if
nodeFactory.removeNode(configuration);
this.compositeActivator = null;
this.proxyFactory = null;
this.domainComposite = null;
this.compositeContext = null;
ThreadMessageContext.removeMessageContext();
CompositeContext.removeCompositeContext();
} catch (ActivationException e) {
throw new IllegalStateException(e);
}
}
@SuppressWarnings("unchecked")
public <B, R extends ServiceReference<B>> R cast(B target) throws IllegalArgumentException {
return (R)proxyFactory.cast(target);
}
public <B> B getService(Class<B> businessInterface, String serviceName) {
ServiceReference<B> serviceReference = getServiceReference(businessInterface, serviceName);
if (serviceReference == null) {
throw new ServiceRuntimeException("Service not found: " + serviceName);
}
return serviceReference.getService();
}
public <B> ServiceReference<B> getServiceReference(Class<B> businessInterface, String name) {
// Extract the component name
String componentName;
String serviceName;
int i = name.indexOf('/');
if (i != -1) {
componentName = name.substring(0, i);
serviceName = name.substring(i + 1);
} else {
componentName = name;
serviceName = null;
}
// Lookup the component
Component component = null;
for (Component compositeComponent : domainComposite.getComponents()) {
if (compositeComponent.getName().equals(componentName)) {
component = compositeComponent;
break;
}
}
if (component == null) {
throw new ServiceUnavailableException("The service " + name + " has not been contributed to the domain");
}
RuntimeComponentContext componentContext = null;
// If the component is a composite, then we need to find the
// non-composite component that provides the requested service
if (component.getImplementation() instanceof Composite) {
for (ComponentService componentService : component.getServices()) {
String bindingName = null;
if (serviceName != null) {
int index = serviceName.indexOf('/');
if (index != -1) {
bindingName = serviceName.substring(index + 1);
serviceName = serviceName.substring(0, index);
}
}
if (serviceName == null || serviceName.equals(componentService.getName())) {
CompositeService compositeService = (CompositeService)componentService.getService();
if (compositeService != null) {
componentContext =
((RuntimeComponent)compositeService.getPromotedComponent()).getComponentContext();
serviceName = compositeService.getPromotedService().getName();
if (bindingName != null) {
serviceName = serviceName + "/" + bindingName;
}
return componentContext.createSelfReference(businessInterface, serviceName);
}
break;
}
}
// No matching service found
throw new ServiceRuntimeException("Composite service not found: " + name);
} else {
componentContext = ((RuntimeComponent)component).getComponentContext();
if (serviceName != null) {
return componentContext.createSelfReference(businessInterface, serviceName);
} else {
return componentContext.createSelfReference(businessInterface);
}
}
}
public NodeConfiguration getConfiguration() {
return configuration;
}
public ExtensionPointRegistry getExtensionPointRegistry() {
return nodeFactory.getExtensionPointRegistry();
}
/**
* Get the service endpoints in this Node
* TODO: needs review, works for the very simple testcase but i expect there are
* other endpoints to be included
*/
public List<Endpoint> getServiceEndpoints() {
List<Endpoint> endpoints = new ArrayList<Endpoint>();
if (compositeActivator != null) {
if (domainComposite != null) {
for (Component component : domainComposite.getComponents()) {
for (Service service : component.getServices()) {
// MJE 28/05/2009 - changed to RuntimeComponentService from RuntimeComponentServiceImpl
// - no need to access the Impl directly here
if (service instanceof RuntimeComponentService) {
endpoints.addAll(((RuntimeComponentService)service).getEndpoints());
}
}
}
}
}
return endpoints;
}
public Composite getDomainComposite() {
return domainComposite;
}
public String dumpDomainComposite() {
StAXArtifactProcessorExtensionPoint xmlProcessors =
getExtensionPointRegistry().getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
StAXArtifactProcessor<Composite> compositeProcessor =
xmlProcessors.getProcessor(Composite.class);
return writeComposite(getDomainComposite(), compositeProcessor);
}
private String writeComposite(Composite composite, StAXArtifactProcessor<Composite> compositeProcessor){
ByteArrayOutputStream bos = new ByteArrayOutputStream();
XMLOutputFactory outputFactory =
nodeFactory.getExtensionPointRegistry().getExtensionPoint(FactoryExtensionPoint.class)
.getFactory(XMLOutputFactory.class);
try {
compositeProcessor.write(composite, outputFactory.createXMLStreamWriter(bos), new ProcessorContext(nodeFactory.registry));
} catch(Exception ex) {
return ex.toString();
}
String result = bos.toString();
// write out and nested composites
for (Component component : composite.getComponents()) {
if (component.getImplementation() instanceof Composite) {
result +=
"\n<!-- XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX -->\n" + writeComposite((Composite)component
.getImplementation(),
compositeProcessor);
}
}
return result;
}
List<Contribution> getContributions() {
return contributions;
}
}