blob: c703b9f8f10738b19c6b89c351878b1d22646ff4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.dosgi.discovery.zookeeper.publish;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser;
import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
import org.osgi.xmlns.rsa.v1_0.PropertyType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Listens for local Endpoints and publishes them to ZooKeeper.
*/
public class PublishingEndpointListener implements EndpointListener {
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
private final ZooKeeper zk;
private final ServiceTracker<DiscoveryPlugin, DiscoveryPlugin> discoveryPluginTracker;
private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
private boolean closed;
private final EndpointDescriptionParser endpointDescriptionParser;
public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
this.zk = zk;
discoveryPluginTracker = new ServiceTracker<DiscoveryPlugin, DiscoveryPlugin>(bctx,
DiscoveryPlugin.class, null);
discoveryPluginTracker.open();
endpointDescriptionParser = new EndpointDescriptionParser();
}
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
LOG.info("Local EndpointDescription added: {}", endpoint);
synchronized (endpoints) {
if (closed) {
return;
}
if (endpoints.contains(endpoint)) {
// TODO -> Should the published endpoint be updated here?
return;
}
try {
addEndpoint(endpoint);
endpoints.add(endpoint);
} catch (Exception ex) {
LOG.error("Exception while processing the addition of an endpoint.", ex);
}
}
}
private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
InterruptedException, IOException {
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint.getId());
Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
// process plugins
Object[] plugins = discoveryPluginTracker.getServices();
if (plugins != null) {
for (Object plugin : plugins) {
if (plugin instanceof DiscoveryPlugin) {
endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
}
}
}
for (String name : interfaces) {
String path = Utils.getZooKeeperPath(name);
String fullPath = path + '/' + endpointKey;
LOG.debug("Creating ZooKeeper node: {}", fullPath);
ensurePath(path, zk);
List<PropertyType> propsOut = new PropertiesMapper().fromProps(props);
EndpointDescriptionType epd = new EndpointDescriptionType();
epd.getProperty().addAll(propsOut);
byte[] epData = endpointDescriptionParser.getData(epd);
createEphemeralNode(fullPath, epData);
}
}
private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
try {
zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (NodeExistsException nee) {
// this sometimes happens after a ZooKeeper node dies and the ephemeral node
// that belonged to the old session was not yet deleted. We need to make our
// session the owner of the node so it won't get deleted automatically -
// we do this by deleting and recreating it ourselves.
LOG.info("node for endpoint already exists, recreating: {}", fullPath);
try {
zk.delete(fullPath, -1);
} catch (NoNodeException nne) {
// it's a race condition, but as long as it got deleted - it's ok
}
zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
LOG.info("Local EndpointDescription removed: {}", endpoint);
synchronized (endpoints) {
if (closed) {
return;
}
if (!endpoints.contains(endpoint)) {
return;
}
try {
removeEndpoint(endpoint);
endpoints.remove(endpoint);
} catch (Exception ex) {
LOG.error("Exception while processing the removal of an endpoint", ex);
}
}
}
private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException {
Collection<String> interfaces = endpoint.getInterfaces();
String endpointKey = getKey(endpoint.getId());
for (String name : interfaces) {
String path = Utils.getZooKeeperPath(name);
String fullPath = path + '/' + endpointKey;
LOG.debug("Removing ZooKeeper node: {}", fullPath);
try {
zk.delete(fullPath, -1);
} catch (Exception ex) {
LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
}
}
}
private static void ensurePath(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
StringBuilder current = new StringBuilder();
String[] parts = Utils.removeEmpty(path.split("/"));
for (String part : parts) {
current.append('/');
current.append(part);
try {
zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException nee) {
// it's not the first node with this path to ever exist - that's normal
}
}
}
static String getKey(String endpoint) throws URISyntaxException {
URI uri = new URI(endpoint);
StringBuilder sb = new StringBuilder();
sb.append(uri.getHost());
sb.append("#");
sb.append(uri.getPort());
sb.append("#");
sb.append(uri.getPath().replace('/', '#'));
return sb.toString();
}
public void close() {
LOG.debug("closing - removing all endpoints");
synchronized (endpoints) {
closed = true;
for (EndpointDescription endpoint : endpoints) {
try {
removeEndpoint(endpoint);
} catch (Exception ex) {
LOG.error("Exception while removing endpoint during close", ex);
}
}
endpoints.clear();
}
discoveryPluginTracker.close();
}
}