Merge pull request #1 from stefan-egli/issue/SLING-9589

SLING-9589 : expose discovery metrics
diff --git a/pom.xml b/pom.xml
index 418a24b..df1242f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
+            <version>1.6.1</version>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
@@ -81,5 +82,23 @@
             <version>1.3.0</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.commons</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.metrics</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- Testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java
new file mode 100644
index 0000000..48db238
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.impl.support;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.InstancesDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * MetricReporter is in charge of listening to TopologyEvents
+ * (by virtue of being a TopologyEventListener) and exposing
+ * metrics (via dropwizard).
+ */
+@Service(value = { TopologyEventListener.class })
+@Component(immediate = true)
+public class MetricReporter implements TopologyEventListener {
+
+    // event counters
+    static final String METRICS_NAME_TOPOLOGY_CHANGING_EVENTS = "discovery.oak.topology.changing.events";
+    static final String METRICS_NAME_TOPOLOGY_INIT_EVENTS = "discovery.oak.topology.init.events";
+    static final String METRICS_NAME_TOPOLOGY_CHANGED_EVENTS = "discovery.oak.topology.changed.events";
+    static final String METRICS_NAME_PROPERTY_CHANGED_EVENTS = "discovery.oak.property.changed.events";
+
+    static final String METRICS_NAME_TOPOLOGY_IS_UNDEFINED = "discovery.oak.topology.is.undefined";
+
+    static final String METRICS_NAME_LOCAL_CLUSTER_INSTANCES = "discovery.oak.local.cluster.instances";
+    static final String METRICS_NAME_LOCAL_CLUSTER_JOINS = "discovery.oak.local.cluster.joins";
+    static final String METRICS_NAME_LOCAL_CLUSTER_LEAVES = "discovery.oak.local.cluster.leaves";
+    static final String METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES = "discovery.oak.local.cluster.leader.switches";
+    static final String METRICS_NAME_LOCAL_CLUSTER_PROPERTIES = "discovery.oak.local.cluster.properties";
+
+    static final String METRICS_NAME_OWN_IS_LEADER = "discovery.oak.own.is.leader";
+    static final String METRICS_NAME_OWN_PROPERTIES = "discovery.oak.own.properties";
+
+    static final String METRICS_NAME_REMOTE_CLUSTERS = "discovery.oak.remote.cluster";
+    static final String METRICS_NAME_REMOTE_INSTANCES = "discovery.oak.remote.instances";
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Reference(target = "(name=sling)")
+    MetricRegistry metricRegistry;
+
+    private final List<String> registeredGaugeNameList = new LinkedList<String>();
+
+    /** 
+     * for init there would really only be 2 values needed: 0 and 1
+     * as there should only ever be 1 TOPOLOGY_INIT event.
+     * But for monitoring reasons it might be interesting to use a 
+     * counter here nevertheless and ensure that it never goes above 1.
+     */
+    private final AtomicInteger initEvents = new AtomicInteger(0);
+
+    /** 
+     * counts number of TOPOLOGY_CHANGING events. An int should be enough,
+     * if there is 1 event per second this lasts 68 years
+     */
+    private final AtomicInteger changingEvents = new AtomicInteger(0);
+
+    /** 
+     * counts number of TOPOLOGY_CHANGED events. An int should be enough,
+     * if there is 1 event per second this lasts 68 years
+     */
+    private final AtomicInteger changedEvents = new AtomicInteger(0);
+
+    /** 
+     * counts number of TOPOLOGY_CHANGED events. 
+     * With a long if there is 1 event per millisecond it lasts 292471208 years
+     */
+    private final AtomicLong propertyChangedEvents = new AtomicLong(0);
+
+    /**
+     * This is either 0 or 1 - but since the Gauge should be as fast as possible
+     * we maintain an int.
+     * Note that if the topology is undefined the localLeader is also undefined
+     * (but still represents the previous value).
+     * This one starts off as 1 until TOPOLOGY_INIT is received.
+     */
+    private final AtomicInteger topologyIsUndefined = new AtomicInteger(1);
+
+    /** 
+     * Keeps track of number of instances in local cluster.
+     * There should really only be a small number, but even so an int is certainly enough.
+     */
+    private final AtomicInteger localClusterInstances = new AtomicInteger(0);
+
+    /**
+     * Counts the number of instances that joined the local cluster, over time.
+     * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied
+     * by number of instances joining per such event. Still, an int sounds more than enough.
+     */
+    private final AtomicInteger localClusterJoins = new AtomicInteger(0);
+
+    /**
+     * Counts the number of instances that left the local cluster, over time.
+     * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied
+     * by number of instances leaving per such event. Still, an int sounds more than enough.
+     */
+    private final AtomicInteger localClusterLeaves = new AtomicInteger(0);
+
+    /**
+     * The number of leader changes should be smaller or equal to
+     * TOPOLOGY_CHANGED events +1.
+     * So an int seems more than enough.
+     * Note that this counts only actual changes, not a leader being announced via TOPOLOGY_INIT.
+     */
+    private final AtomicInteger localClusterLeaderSwitches = new AtomicInteger(0);
+
+    /**
+     * The order of magnitude here is number of properties multiplied by number of instances in the local cluster.
+     * So this is an order of magnitude higher than ownPropertiesCount -
+     * but still, an int should be enough, really.
+     */
+    private final AtomicInteger localClusterProperties = new AtomicInteger(0);
+
+    /**
+     * This is either 0 or 1 - but since the Gauge should be as fast as possible
+     * we maintain an int.
+     * Note that localLeader is only valid if the topology is not changing currently
+     * (otherwise it is undefined).
+     */
+    private final AtomicInteger ownIsLeader = new AtomicInteger(0);
+
+    /**
+     * There shouldn't be an awful lot of properties, so int sounds more than enough
+     */
+    private final AtomicInteger ownProperties = new AtomicInteger(0);
+
+    /**
+     * Attached/remote clusters aren't probably too many, so again, int is enough
+     */
+    private final AtomicInteger remoteClusters = new AtomicInteger(0);
+
+    /**
+     * Attached/remote instances (sum of instances in remote clusters) -
+     * probably aren't too many, int is enough
+     */
+    private final AtomicInteger remoteInstances = new AtomicInteger(0);
+
+    @Activate
+    protected void activate() {
+        logger.debug("activate: start");
+
+        createGauge(METRICS_NAME_TOPOLOGY_INIT_EVENTS, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return initEvents.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_TOPOLOGY_CHANGING_EVENTS, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return changingEvents.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_TOPOLOGY_CHANGED_EVENTS, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return changedEvents.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_PROPERTY_CHANGED_EVENTS, new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return propertyChangedEvents.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_TOPOLOGY_IS_UNDEFINED, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return topologyIsUndefined.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_LOCAL_CLUSTER_INSTANCES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return localClusterInstances.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_LOCAL_CLUSTER_JOINS, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return localClusterJoins.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_LOCAL_CLUSTER_LEAVES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return localClusterLeaves.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return localClusterLeaderSwitches.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_LOCAL_CLUSTER_PROPERTIES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return localClusterProperties.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_OWN_IS_LEADER, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return ownIsLeader.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_OWN_PROPERTIES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return ownProperties.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_REMOTE_CLUSTERS, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return remoteClusters.get();
+            }
+        });
+
+        createGauge(METRICS_NAME_REMOTE_INSTANCES, new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return remoteInstances.get();
+            }
+        });
+
+        logger.info("activate: done.");
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void createGauge(String gaugeName, Gauge gauge) {
+        logger.debug("createGauge: registering gauge : " + gaugeName);
+        this.metricRegistry.register(gaugeName, gauge);
+        registeredGaugeNameList.add(gaugeName);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        logger.debug("deactivate: deactivating.");
+        unregisterGauges();
+        logger.info("deactivate: done.");
+    }
+
+    private void unregisterGauges() {
+        for (String registeredGaugeName : registeredGaugeNameList) {
+            logger.debug("unregisterGauges : unregistering gauge : " + registeredGaugeName);
+            metricRegistry.remove(registeredGaugeName);
+        }
+    }
+
+    @Override
+    public void handleTopologyEvent(TopologyEvent event) {
+        if (event == null) {
+            // this should not occur
+            return;
+        }
+
+        try {
+            switch (event.getType()) {
+            case TOPOLOGY_INIT: {
+                handleInit(event.getNewView());
+                return;
+            }
+            case TOPOLOGY_CHANGING: {
+                handleChanging();
+                return;
+            }
+            case TOPOLOGY_CHANGED: {
+                handleChanged(event.getOldView(), event.getNewView());
+                return;
+            }
+            case PROPERTIES_CHANGED: {
+                handlePropertiesChanged(event.getNewView());
+                return;
+            }
+            }
+        } catch (Exception e) {
+            // we should not really see any of those, but just in case..:
+            logger.error("handleTopologyEvent: got Exception " + e, e);
+        }
+    }
+
+    private void handleInit(TopologyView newView) {
+        initEvents.incrementAndGet();
+        topologyIsUndefined.set(0);
+
+        updateLocalClusterInstances(null, newView);
+
+        updateProperties(newView);
+        updateOwnIsLeader(newView);
+        updateRemote(newView);
+    }
+
+    private void handleChanging() {
+        changingEvents.incrementAndGet();
+        topologyIsUndefined.set(1);
+    }
+
+    private void handleChanged(TopologyView oldView, TopologyView newView) {
+        changedEvents.incrementAndGet();
+        topologyIsUndefined.set(0);
+
+        updateLocalClusterInstances(oldView, newView);
+
+        updateLeaderSwitch(oldView, newView);
+
+        updateProperties(newView);
+        updateRemote(newView);
+    }
+
+    private void handlePropertiesChanged(TopologyView newView) {
+        propertyChangedEvents.incrementAndGet();
+
+        updateProperties(newView);
+    }
+
+    private void updateLocalClusterInstances(TopologyView oldViewOrNull, TopologyView newView) {
+        final ClusterView newLocalClusterView = newView.getLocalInstance().getClusterView();
+        localClusterInstances.set(newLocalClusterView.getInstances().size());
+
+        if (oldViewOrNull == null) {
+            localClusterJoins.addAndGet(newLocalClusterView.getInstances().size());
+        } else {
+            final ClusterView oldLocalClusterView = oldViewOrNull.getLocalInstance().getClusterView();
+            final InstancesDiff diff = new InstancesDiff(oldLocalClusterView, newLocalClusterView);
+            final Collection<InstanceDescription> added = diff.added().get();
+            final Collection<InstanceDescription> removed = diff.removed().get();
+
+            if (added.size() > 0) {
+                localClusterJoins.addAndGet(added.size());
+            }
+            if (removed.size() > 0) {
+                localClusterLeaves.addAndGet(removed.size());
+            }
+        }
+    }
+
+    private void updateLeaderSwitch(TopologyView oldView, TopologyView newView) {
+        final InstanceDescription oldLeader = oldView.getLocalInstance().getClusterView().getLeader();
+        final InstanceDescription newLeader = newView.getLocalInstance().getClusterView().getLeader();
+        if (!oldLeader.getSlingId().equals(newLeader.getSlingId())) {
+            localClusterLeaderSwitches.incrementAndGet();
+        }
+
+        updateOwnIsLeader(newView);
+    }
+
+    private void updateOwnIsLeader(TopologyView newView) {
+        if (newView.getLocalInstance().isLeader()) {
+            ownIsLeader.set(1);
+        } else {
+            ownIsLeader.set(0);
+        }
+    }
+
+    private void updateProperties(TopologyView newView) {
+        ownProperties.set(newView.getLocalInstance().getProperties().size());
+        final ClusterView localCluster = newView.getLocalInstance().getClusterView();
+        int properties = 0;
+        for (InstanceDescription instance : localCluster.getInstances()) {
+            properties += instance.getProperties().size();
+        }
+        localClusterProperties.set(properties);
+    }
+
+    private void updateRemote(TopologyView newView) {
+        // remoteClusters only counts the remote ones, so we subtract 1 representing our local one
+        remoteClusters.set(newView.getClusterViews().size() - 1);
+        final String localClusterId = newView.getLocalInstance().getClusterView().getId();
+        int instances = 0;
+        for (ClusterView cluster : newView.getClusterViews()) {
+            final String clusterId = cluster.getId();
+            if (!clusterId.equals(localClusterId)) {
+                instances += cluster.getInstances().size();
+            }
+        }
+        remoteInstances.set(instances);
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java
new file mode 100644
index 0000000..02f77b7
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.impl.support;
+
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_INSTANCES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_JOINS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEAVES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_PROPERTIES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_IS_LEADER;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_PROPERTIES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_PROPERTY_CHANGED_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_CLUSTERS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_INSTANCES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGED_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGING_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_INIT_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_IS_UNDEFINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.InstanceFilter;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.base.ViewStateManagerFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class MetricReporterTest {
+
+    static class TestTopologyView extends BaseTopologyView {
+
+        private static final Random propertiesChangerRandom = new Random(54321);
+
+        private final InstanceDescription localInstance;
+
+        private HashSet<ClusterView> clusterViews;
+
+        static Map<String, String> newEmptyProps() {
+            final Map<String, String> result = new HashMap<String, String>();
+            return result;
+        }
+
+        static TestTopologyView singleInstance() {
+            return singleInstance(UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        }
+
+        static TestTopologyView singleInstance(String localSlingId, String localClusterId) {
+            final DefaultClusterView localCluster = new DefaultClusterView(localClusterId);
+            final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps());
+            return new TestTopologyView(localInstance);
+        }
+
+        static BaseTopologyView localClusterWithRandomInstances(int numRandomInstances) {
+            return localClusterWithRandomInstances(UUID.randomUUID().toString(), UUID.randomUUID().toString(), numRandomInstances);
+        }
+
+        static BaseTopologyView localClusterWithRandomInstances(String localSlingId, String clusterId, int numRandomInstances) {
+            final DefaultClusterView localCluster = new DefaultClusterView(clusterId);
+            final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps());
+            for (int i = 0; i < numRandomInstances; i++) {
+                new DefaultInstanceDescription(localCluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+            }
+
+            return new TestTopologyView(localInstance);
+        }
+
+        public static BaseTopologyView withLeaderSwitchedBy(BaseTopologyView view, int offset) {
+            final ClusterView cluster = view.getLocalInstance().getClusterView();
+            int index = 0;
+            for (InstanceDescription instance : cluster.getInstances()) {
+                if (instance.isLeader()) {
+                    break;
+                } else {
+                    index++;
+                }
+            }
+            int newLeader = (index + offset) % cluster.getInstances().size();
+            final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId());
+            index = 0;
+            InstanceDescription local = null;
+            for (InstanceDescription instance : cluster.getInstances()) {
+                DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, index++ == newLeader, instance.isLocal(),
+                        instance.getSlingId(), instance.getProperties());
+                if (instance.isLocal()) {
+                    local = aNewInstance;
+                }
+            }
+
+            return new TestTopologyView(local);
+        }
+
+        public static BaseTopologyView withRandomPropertiesAdded(BaseTopologyView view, int offset, int numRandomPropertiesChanged) {
+            final ClusterView cluster = view.getLocalInstance().getClusterView();
+            final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId());
+            int index = 0;
+            InstanceDescription local = null;
+            for (InstanceDescription instance : cluster.getInstances()) {
+                Map<String, String> properties = instance.getProperties();
+                if (index++ == offset) {
+                    properties = withRandomPropertiesAdded(properties, numRandomPropertiesChanged);
+                }
+                DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, instance.isLeader(), instance.isLocal(),
+                        instance.getSlingId(), properties);
+                if (instance.isLocal()) {
+                    local = aNewInstance;
+                }
+            }
+
+            return new TestTopologyView(local);
+        }
+
+        private static Map<String, String> withRandomPropertiesAdded(Map<String, String> properties, int numRandomPropertiesChanged) {
+            final Map<String, String> result = new HashMap<String, String>(properties);
+            for (int i = 0; i < numRandomPropertiesChanged; i++) {
+                final String key = "randomKey-" + propertiesChangerRandom.nextInt();
+                String current = result.put(key, "r");
+                if (current != null) {
+                    // then make sure it is different than the current value
+                    result.put(key, current + "r");
+                }
+            }
+            return result;
+        }
+
+        public static BaseTopologyView multiCluster(int... instanceCounts) {
+            final Set<ClusterView> clusters = new HashSet<ClusterView>();
+            InstanceDescription localInstance = null;
+            for (int i = 0; i < instanceCounts.length; i++) {
+                final int instanceCount = instanceCounts[i];
+
+                final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+                final InstanceDescription firstInstance = new DefaultInstanceDescription(cluster, i == 0, i == 0, UUID.randomUUID().toString(),
+                        newEmptyProps());
+                if (i == 0) {
+                    localInstance = firstInstance;
+                }
+                for (int j = 0; j < instanceCount - 1; j++) {
+                    new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+                }
+
+                clusters.add(cluster);
+            }
+
+            TestTopologyView view = new TestTopologyView(localInstance);
+            view.setClusterViews(clusters);
+            return view;
+        }
+
+        public static BaseTopologyView addRemote(BaseTopologyView view, int instanceCount) {
+            final String localClusterId = view.getLocalInstance().getClusterView().getId();
+            final Set<ClusterView> newClusters = new HashSet<ClusterView>();
+            final Set<ClusterView> existing = view.getClusterViews();
+            InstanceDescription local = null;
+            for (ClusterView clusterView : existing) {
+                ClusterView clonedCluster = clone(clusterView);
+                if (clusterView.getId().equals(localClusterId)) {
+                    for (InstanceDescription inst : clonedCluster.getInstances()) {
+                        if (inst.getSlingId().equals(view.getLocalInstance().getSlingId())) {
+                            local = inst;
+                        }
+                    }
+                }
+                newClusters.add(clonedCluster);
+            }
+
+            final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+            for (int j = 0; j < instanceCount; j++) {
+                new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+            }
+            newClusters.add(cluster);
+
+            TestTopologyView newView = new TestTopologyView(local);
+            newView.setClusterViews(newClusters);
+            return newView;
+        }
+
+        private static ClusterView clone(ClusterView original) {
+            final DefaultClusterView cluster = new DefaultClusterView(original.getId());
+            for (InstanceDescription inst : original.getInstances()) {
+                new DefaultInstanceDescription(cluster, inst.isLeader(), inst.isLocal(), inst.getSlingId(), inst.getProperties());
+            }
+            return cluster;
+        }
+
+        private TestTopologyView(InstanceDescription localInstance) {
+            this.localInstance = localInstance;
+            final Set<ClusterView> result = new HashSet<ClusterView>();
+            result.add(localInstance.getClusterView());
+            setClusterViews(result);
+        }
+
+        @Override
+        public InstanceDescription getLocalInstance() {
+            return localInstance;
+        }
+
+        @Override
+        public Set<InstanceDescription> getInstances() {
+            // to achieve some true randomness in the way this list is returned,
+            // we actually use a Random .. not that this matters much, but
+            // would be good to have tests not rely on this
+            final Set<InstanceDescription> result = new HashSet<InstanceDescription>();
+            final Random r = new Random();
+            final List<ClusterView> clusters = new LinkedList<ClusterView>(clusterViews);
+            while (!clusters.isEmpty()) {
+                final ClusterView cluster = clusters.remove(r.nextInt(clusters.size()));
+                for (InstanceDescription instance : cluster.getInstances()) {
+                    result.add(instance);
+                }
+            }
+            return result;
+        }
+
+        @Override
+        public Set<InstanceDescription> findInstances(InstanceFilter filter) {
+            throw new IllegalStateException("not implemented");
+        }
+
+        @Override
+        public Set<ClusterView> getClusterViews() {
+            return clusterViews;
+        }
+
+        private void setClusterViews(Set<ClusterView> clusterViews) {
+            this.clusterViews = new HashSet<ClusterView>(clusterViews);
+        }
+
+        @Override
+        public String getLocalClusterSyncTokenId() {
+            throw new IllegalStateException("not implemented");
+        }
+
+    }
+
+    private MetricReporter reporter;
+    private MetricRegistry metricRegistry;
+    private ViewStateManager viewStateManager;
+    private Lock viewStateManagerLock;
+
+    @Before
+    public void setup() {
+        reporter = new MetricReporter();
+        metricRegistry = new MetricRegistry();
+        reporter.metricRegistry = metricRegistry;
+        viewStateManagerLock = new ReentrantLock();
+        viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, null);
+        viewStateManager.bind(reporter);
+    }
+
+    @Test
+    public void testActivateDeactivate() {
+        for (int i = 0; i < 100; i++) {
+            reporter.activate();
+            reporter.deactivate();
+        }
+    }
+
+    @Test
+    public void testPreInitState() throws InterruptedException {
+        reporter.activate();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        reporter.deactivate();
+    }
+
+    @Test
+    public void testStandardSequence() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        viewStateManager.handleChanging();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        final BaseTopologyView myView = TestTopologyView.singleInstance();
+        viewStateManager.handleNewView(myView);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+        for (int i = 0; i < 10; i++) {
+            viewStateManager.handleChanging();
+            viewStateManager.waitForAsyncEvents(5000);
+            assertGaugesEqual(1, i, i + 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+            viewStateManager.handleNewView(
+                    TestTopologyView.singleInstance(myView.getLocalInstance().getSlingId(), myView.getLocalInstance().getClusterView().getId()));
+            viewStateManager.waitForAsyncEvents(5000);
+            assertGaugesEqual(1, i + 1, i + 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+        }
+    }
+
+    @Test
+    public void testRandomJoinsLeaves() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        final BaseTopologyView baseView = TestTopologyView.singleInstance();
+        viewStateManager.handleNewView(baseView);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+
+        final Random r = new Random(12345678);
+        int joinCnt = 1;
+        int leaveCnt = 0;
+        for (int i = 0; i < 10; i++) {
+            int numRandomInstances = r.nextInt(5) + 1;
+            joinCnt += numRandomInstances;
+            viewStateManager.handleNewView(TestTopologyView.localClusterWithRandomInstances(baseView.getLocalInstance().getSlingId(),
+                    baseView.getLocalInstance().getClusterView().getId(), numRandomInstances));
+            viewStateManager.waitForAsyncEvents(5000);
+            assertGaugesEqual(1, 2 * i + 1, 2 * i + 1, 0, 0, 1 + numRandomInstances, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0);
+
+            // back to only my instance - all newly joined leave again
+            leaveCnt += numRandomInstances;
+            viewStateManager.handleNewView(
+                    TestTopologyView.singleInstance(baseView.getLocalInstance().getSlingId(), baseView.getLocalInstance().getClusterView().getId()));
+            viewStateManager.waitForAsyncEvents(5000);
+            assertGaugesEqual(1, 2 * i + 2, 2 * i + 2, 0, 0, 1, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0);
+        }
+    }
+
+    @Test
+    public void testLeader() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        final int num = 11;
+        BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0);
+
+        for (int i = 0; i < 10; i++) {
+            int offset = 1;
+            view = TestTopologyView.withLeaderSwitchedBy(view, offset);
+            viewStateManager.handleNewView(view);
+            viewStateManager.waitForAsyncEvents(5000);
+            assertGaugesEqual(1, i + 1, i + 1, 0, 0, num, num, 0, i + 1, 0, view.getLocalInstance().isLeader() ? 1 : 0, 0, 0, 0);
+        }
+    }
+
+    @Test
+    public void testInitialAndChangedProperties() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(3);
+        view = TestTopologyView.withRandomPropertiesAdded(view, 0, 4);
+        view = TestTopologyView.withRandomPropertiesAdded(view, 1, 3);
+        view = TestTopologyView.withRandomPropertiesAdded(view, 2, 8);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8, 1, 4, 0, 0);
+
+        view = TestTopologyView.withRandomPropertiesAdded(view, 1, 7);
+        view = TestTopologyView.addRemote(view, 1);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 1, 1, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8 + 7, 1, 4, 1, 1);
+    }
+
+    @Test
+    public void testProperties() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        final int num = 11;
+        BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0);
+
+        final Random r = new Random(100000);
+        for (int i = 0; i < num; i++) {
+            int numRandomPropertiesChanged = r.nextInt(10);
+            view = TestTopologyView.withRandomPropertiesAdded(view, i, numRandomPropertiesChanged);
+            viewStateManager.handleNewView(view);
+            viewStateManager.waitForAsyncEvents(5000);
+            int localPropertiesCount = localProperties(view);
+            assertTrue("no properties set", localPropertiesCount > 0);
+            assertGaugesEqual(1, 0, 0, i + 1, 0, num, num, 0, 0, localPropertiesCount, view.getLocalInstance().isLeader() ? 1 : 0,
+                    view.getLocalInstance().getProperties().size(), 0, 0);
+        }
+    }
+
+    @Test
+    public void testRemoteCluster() throws InterruptedException {
+        reporter.activate();
+        viewStateManager.handleActivated();
+        assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+        BaseTopologyView view = TestTopologyView.multiCluster(3, 2, 4);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 0, 0, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(),
+                view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 2, 6);
+
+        view = TestTopologyView.addRemote(view, 5);
+        viewStateManager.handleNewView(view);
+        viewStateManager.waitForAsyncEvents(5000);
+        assertGaugesEqual(1, 1, 1, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(),
+                view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 3, 11);
+    }
+
+    private int localProperties(BaseTopologyView view) {
+        int properties = 0;
+        for (InstanceDescription instance : view.getLocalInstance().getClusterView().getInstances()) {
+            properties += instance.getProperties().size();
+        }
+        return properties;
+    }
+
+    private void assertGaugesEqual(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances, int localJoins,
+            int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount, int remoteClustersCount,
+            int remoteInstancesCount) throws InterruptedException {
+        long maxWait = System.currentTimeMillis() + 5000;
+        while (maxWait < System.currentTimeMillis()) {
+            try {
+                assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves,
+                        localLeaderSwitches, localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount);
+                // successful case
+                return;
+            } catch (Throwable th) {
+                // error case : sleep and retry
+                Thread.sleep(100);
+            }
+        }
+        // timeout case : one last retry, outside try/catch to propagate the failure
+        assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves, localLeaderSwitches,
+                localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount);
+    }
+
+    private void assertGaugesEqualNoWait(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances,
+            int localJoins, int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount,
+            int remoteClustersCount, int remoteInstancesCount) {
+        assertGaugeEquals(init, METRICS_NAME_TOPOLOGY_INIT_EVENTS);
+        assertGaugeEquals(changed, METRICS_NAME_TOPOLOGY_CHANGED_EVENTS);
+        assertGaugeEquals(changing, METRICS_NAME_TOPOLOGY_CHANGING_EVENTS);
+        assertGaugeEquals(propertyChanged, METRICS_NAME_PROPERTY_CHANGED_EVENTS);
+
+        assertGaugeEquals(isUndefined, METRICS_NAME_TOPOLOGY_IS_UNDEFINED);
+
+        assertGaugeEquals(localInstances, METRICS_NAME_LOCAL_CLUSTER_INSTANCES);
+        assertGaugeEquals(localJoins, METRICS_NAME_LOCAL_CLUSTER_JOINS);
+        assertGaugeEquals(localLeaves, METRICS_NAME_LOCAL_CLUSTER_LEAVES);
+        assertGaugeEquals(localLeaderSwitches, METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES);
+        assertGaugeEquals(localPropertiesCount, METRICS_NAME_LOCAL_CLUSTER_PROPERTIES);
+
+        assertGaugeEquals(ownIsLeader, METRICS_NAME_OWN_IS_LEADER);
+        assertGaugeEquals(ownPropertiesCount, METRICS_NAME_OWN_PROPERTIES);
+
+        assertGaugeEquals(remoteClustersCount, METRICS_NAME_REMOTE_CLUSTERS);
+        assertGaugeEquals(remoteInstancesCount, METRICS_NAME_REMOTE_INSTANCES);
+    }
+
+    private void assertGaugeEquals(int expected, String metricsName) {
+        assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue());
+    }
+
+    private void assertGaugeEquals(long expected, String metricsName) {
+        assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue());
+    }
+}