blob: 95277d39b9f9949b4d38ab0c174f0806e0c6caf4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser;
import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Monitors ZooKeeper for changes in published endpoints.
* <p>
* Specifically, it monitors the node path associated with a given interface class,
* whose data is a serialized version of an EndpointDescription, and notifies an
* EndpointListener when changes are detected (which can then propagate the
* notification to other EndpointListeners with a matching scope).
* <p>
* Note that the EndpointListener is used here as a decoupling interface for
* convenience, and is not necessarily used according to its documented contract.
*/
public class InterfaceMonitor implements Watcher, StatCallback {
private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
private final String znode;
private final ZooKeeper zk;
private final EndpointListener endpointListener;
private final boolean recursive;
private volatile boolean closed;
// This map reference changes, so don't synchronize on it
private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
private EndpointDescriptionParser parser;
public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) {
this.zk = zk;
this.znode = Utils.getZooKeeperPath(objClass);
this.recursive = objClass == null || objClass.isEmpty();
this.endpointListener = endpointListener;
this.parser = new EndpointDescriptionParser();
LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
new Object[] {recursive ? "(recursive)" : "", scope, objClass});
}
/**
* Returns all endpoints that are currently known to this monitor.
*
* @return all endpoints that are currently known to this monitor
*/
public synchronized List<EndpointDescription> getEndpoints() {
return new ArrayList<EndpointDescription>(nodes.values());
}
public void start() {
watch();
}
private void watch() {
LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
zk.exists(znode, this, this, null);
}
/**
* Zookeeper Watcher interface callback.
*/
public void process(WatchedEvent event) {
LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
processDelta();
}
/**
* Zookeeper StatCallback interface callback.
*/
@SuppressWarnings("deprecation")
public void processResult(int rc, String path, Object ctx, Stat stat) {
LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
switch (rc) {
case Code.Ok:
case Code.NoNode:
processDelta();
return;
case Code.SessionExpired:
case Code.NoAuth:
case Code.ConnectionLoss:
return;
default:
watch();
}
}
private void processDelta() {
if (closed) {
return;
}
if (zk.getState() != ZooKeeper.States.CONNECTED) {
LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
return;
}
try {
if (zk.exists(znode, false) != null) {
zk.getChildren(znode, this);
refreshNodes();
} else {
LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
}
} catch (Exception e) {
if (zk.getState() != ZooKeeper.States.CONNECTED) {
LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
} else {
LOG.error("Error getting ZooKeeper data.", e);
}
}
}
public synchronized void close() {
closed = true;
for (EndpointDescription endpoint : nodes.values()) {
endpointListener.endpointRemoved(endpoint, null);
}
nodes.clear();
}
private synchronized void refreshNodes() {
if (closed) {
return;
}
LOG.info("Processing change on node: {}", znode);
Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
processChildren(znode, newNodes, prevNodes);
// whatever is left in prevNodes now has been removed from Discovery
LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
for (EndpointDescription endpoint : prevNodes.values()) {
endpointListener.endpointRemoved(endpoint, null);
}
nodes = newNodes;
}
/**
* Iterates through all child nodes of the given node and tries to find
* endpoints. If the recursive flag is set it also traverses into the child
* nodes.
*
* @return true if an endpoint was found and if the node therefore needs to
* be monitored for changes
*/
private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
Map<String, EndpointDescription> prevNodes) {
List<String> children;
try {
LOG.debug("Processing the children of {}", zn);
children = zk.getChildren(zn, false);
boolean foundANode = false;
for (String child : children) {
String childZNode = zn + '/' + child;
EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
if (endpoint != null) {
EndpointDescription prevEndpoint = prevNodes.get(child);
LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: "
+ endpoint.getProperties().values());
newNodes.put(child, endpoint);
prevNodes.remove(child);
foundANode = true;
LOG.debug("Properties: {}", endpoint.getProperties());
if (prevEndpoint == null) {
// This guy is new
endpointListener.endpointAdded(endpoint, null);
} else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
// TODO
}
}
if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
zk.getChildren(childZNode, this);
}
}
return foundANode;
} catch (KeeperException e) {
LOG.error("Problem processing ZooKeeper node", e);
} catch (InterruptedException e) {
LOG.error("Problem processing ZooKeeper node", e);
}
return false;
}
/**
* Retrieves data from the given node and parses it into an EndpointDescription.
*
* @param node a node path
* @return endpoint found in the node or null if no endpoint was found
*/
private EndpointDescription getEndpointDescriptionFromNode(String node) {
try {
Stat stat = zk.exists(node, false);
if (stat == null || stat.getDataLength() <= 0) {
return null;
}
byte[] data = zk.getData(node, false, null);
LOG.debug("Got data for node: {}", node);
EndpointDescription endpoint = getFirstEnpointDescription(data);
if (endpoint != null) {
return endpoint;
}
LOG.warn("No Discovery information found for node: {}", node);
} catch (Exception e) {
LOG.error("Problem getting EndpointDescription from node " + node, e);
}
return null;
}
public EndpointDescription getFirstEnpointDescription(byte[] data) {
List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data));
if (elements.isEmpty()) {
return null;
}
Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty());
return new EndpointDescription(props);
}
}