| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.cxf.dosgi.discovery.zookeeper.subscribe; |
| |
| import java.io.ByteArrayInputStream; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils; |
| import org.apache.cxf.dosgi.endpointdesc.EndpointDescriptionParser; |
| import org.apache.cxf.dosgi.endpointdesc.PropertiesMapper; |
| import org.apache.zookeeper.AsyncCallback.StatCallback; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| import org.osgi.service.remoteserviceadmin.EndpointDescription; |
| import org.osgi.service.remoteserviceadmin.EndpointListener; |
| import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Monitors ZooKeeper for changes in published endpoints. |
| * <p> |
| * Specifically, it monitors the node path associated with a given interface class, |
| * whose data is a serialized version of an EndpointDescription, and notifies an |
| * EndpointListener when changes are detected (which can then propagate the |
| * notification to other EndpointListeners with a matching scope). |
| * <p> |
| * Note that the EndpointListener is used here as a decoupling interface for |
| * convenience, and is not necessarily used according to its documented contract. |
| */ |
| public class InterfaceMonitor implements Watcher, StatCallback { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class); |
| |
| private final String znode; |
| private final ZooKeeper zk; |
| private final EndpointListener endpointListener; |
| private final boolean recursive; |
| private volatile boolean closed; |
| |
| // This map reference changes, so don't synchronize on it |
| private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>(); |
| |
| private EndpointDescriptionParser parser; |
| |
| public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointListener endpointListener, String scope) { |
| this.zk = zk; |
| this.znode = Utils.getZooKeeperPath(objClass); |
| this.recursive = objClass == null || objClass.isEmpty(); |
| this.endpointListener = endpointListener; |
| this.parser = new EndpointDescriptionParser(); |
| LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]", |
| new Object[] {recursive ? "(recursive)" : "", scope, objClass}); |
| } |
| |
| /** |
| * Returns all endpoints that are currently known to this monitor. |
| * |
| * @return all endpoints that are currently known to this monitor |
| */ |
| public synchronized List<EndpointDescription> getEndpoints() { |
| return new ArrayList<EndpointDescription>(nodes.values()); |
| } |
| |
| public void start() { |
| watch(); |
| } |
| |
| private void watch() { |
| LOG.debug("registering a ZooKeeper.exists({}) callback", znode); |
| zk.exists(znode, this, this, null); |
| } |
| |
| /** |
| * Zookeeper Watcher interface callback. |
| */ |
| public void process(WatchedEvent event) { |
| LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event); |
| processDelta(); |
| } |
| |
| /** |
| * Zookeeper StatCallback interface callback. |
| */ |
| @SuppressWarnings("deprecation") |
| public void processResult(int rc, String path, Object ctx, Stat stat) { |
| LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc); |
| |
| switch (rc) { |
| case Code.Ok: |
| case Code.NoNode: |
| processDelta(); |
| return; |
| |
| case Code.SessionExpired: |
| case Code.NoAuth: |
| case Code.ConnectionLoss: |
| return; |
| |
| default: |
| watch(); |
| } |
| } |
| |
| private void processDelta() { |
| if (closed) { |
| return; |
| } |
| |
| if (zk.getState() != ZooKeeper.States.CONNECTED) { |
| LOG.debug("ZooKeeper connection was already closed! Not processing changed event."); |
| return; |
| } |
| |
| try { |
| if (zk.exists(znode, false) != null) { |
| zk.getChildren(znode, this); |
| refreshNodes(); |
| } else { |
| LOG.debug("znode {} doesn't exist -> not processing any changes", znode); |
| } |
| } catch (Exception e) { |
| if (zk.getState() != ZooKeeper.States.CONNECTED) { |
| LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery |
| } else { |
| LOG.error("Error getting ZooKeeper data.", e); |
| } |
| } |
| } |
| |
| public synchronized void close() { |
| closed = true; |
| for (EndpointDescription endpoint : nodes.values()) { |
| endpointListener.endpointRemoved(endpoint, null); |
| } |
| nodes.clear(); |
| } |
| |
| private synchronized void refreshNodes() { |
| if (closed) { |
| return; |
| } |
| LOG.info("Processing change on node: {}", znode); |
| |
| Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>(); |
| Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes); |
| processChildren(znode, newNodes, prevNodes); |
| |
| // whatever is left in prevNodes now has been removed from Discovery |
| LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values()); |
| for (EndpointDescription endpoint : prevNodes.values()) { |
| endpointListener.endpointRemoved(endpoint, null); |
| } |
| nodes = newNodes; |
| } |
| |
| /** |
| * Iterates through all child nodes of the given node and tries to find |
| * endpoints. If the recursive flag is set it also traverses into the child |
| * nodes. |
| * |
| * @return true if an endpoint was found and if the node therefore needs to |
| * be monitored for changes |
| */ |
| private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes, |
| Map<String, EndpointDescription> prevNodes) { |
| List<String> children; |
| try { |
| LOG.debug("Processing the children of {}", zn); |
| children = zk.getChildren(zn, false); |
| |
| boolean foundANode = false; |
| for (String child : children) { |
| String childZNode = zn + '/' + child; |
| EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); |
| if (endpoint != null) { |
| EndpointDescription prevEndpoint = prevNodes.get(child); |
| LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " |
| + endpoint.getProperties().values()); |
| newNodes.put(child, endpoint); |
| prevNodes.remove(child); |
| foundANode = true; |
| LOG.debug("Properties: {}", endpoint.getProperties()); |
| if (prevEndpoint == null) { |
| // This guy is new |
| endpointListener.endpointAdded(endpoint, null); |
| } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { |
| // TODO |
| } |
| } |
| if (recursive && processChildren(childZNode, newNodes, prevNodes)) { |
| zk.getChildren(childZNode, this); |
| } |
| } |
| |
| return foundANode; |
| } catch (KeeperException e) { |
| LOG.error("Problem processing ZooKeeper node", e); |
| } catch (InterruptedException e) { |
| LOG.error("Problem processing ZooKeeper node", e); |
| } |
| return false; |
| } |
| |
| /** |
| * Retrieves data from the given node and parses it into an EndpointDescription. |
| * |
| * @param node a node path |
| * @return endpoint found in the node or null if no endpoint was found |
| */ |
| private EndpointDescription getEndpointDescriptionFromNode(String node) { |
| try { |
| Stat stat = zk.exists(node, false); |
| if (stat == null || stat.getDataLength() <= 0) { |
| return null; |
| } |
| byte[] data = zk.getData(node, false, null); |
| LOG.debug("Got data for node: {}", node); |
| |
| EndpointDescription endpoint = getFirstEnpointDescription(data); |
| if (endpoint != null) { |
| return endpoint; |
| } |
| LOG.warn("No Discovery information found for node: {}", node); |
| } catch (Exception e) { |
| LOG.error("Problem getting EndpointDescription from node " + node, e); |
| } |
| return null; |
| } |
| |
| public EndpointDescription getFirstEnpointDescription(byte[] data) { |
| List<EndpointDescriptionType> elements = parser.getEndpointDescriptions(new ByteArrayInputStream(data)); |
| if (elements.isEmpty()) { |
| return null; |
| } |
| Map<String, Object> props = new PropertiesMapper().toProps(elements.get(0).getProperty()); |
| return new EndpointDescription(props); |
| } |
| } |