SLING-9589 : expose discovery metrics
diff --git a/pom.xml b/pom.xml
index 418a24b..462ef1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
@@ -81,5 +82,23 @@
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.commons</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java
new file mode 100644
index 0000000..205c786
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.impl.support;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.InstancesDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * MetricReporter is in charge of listening to TopologyEvents
+ * (by virtue of being a TopologyEventListener) and exposing
+ * metrics (via dropwizard).
+ */
+@Service(value = { TopologyEventListener.class })
+@Component(immediate = true)
+public class MetricReporter implements TopologyEventListener {
+
+ // event counters
+ static final String METRICS_NAME_TOPOLOGY_CHANGING_EVENTS = "discovery.oak.topology.changing.events";
+ static final String METRICS_NAME_TOPOLOGY_INIT_EVENTS = "discovery.oak.topology.init.events";
+ static final String METRICS_NAME_TOPOLOGY_CHANGED_EVENTS = "discovery.oak.topology.changed.events";
+ static final String METRICS_NAME_PROPERTY_CHANGED_EVENTS = "discovery.oak.property.changed.events";
+
+ static final String METRICS_NAME_TOPOLOGY_IS_UNDEFINED = "discovery.oak.topology.is.undefined";
+
+ static final String METRICS_NAME_LOCAL_CLUSTER_INSTANCES = "discovery.oak.local.cluster.instances";
+ static final String METRICS_NAME_LOCAL_CLUSTER_JOINS = "discovery.oak.local.cluster.joins";
+ static final String METRICS_NAME_LOCAL_CLUSTER_LEAVES = "discovery.oak.local.cluster.leaves";
+ static final String METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES = "discovery.oak.local.cluster.leader.switches";
+ static final String METRICS_NAME_LOCAL_CLUSTER_PROPERTIES = "discovery.oak.local.cluster.properties";
+
+ static final String METRICS_NAME_OWN_IS_LEADER = "discovery.oak.own.is.leader";
+ static final String METRICS_NAME_OWN_PROPERTIES = "discovery.oak.own.properties";
+
+ static final String METRICS_NAME_REMOTE_CLUSTERS = "discovery.oak.remote.cluster";
+ static final String METRICS_NAME_REMOTE_INSTANCES = "discovery.oak.remote.instances";
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Reference(target = "(name=sling)")
+ MetricRegistry metricRegistry;
+
+ private final List<String> registeredGaugeNameList = new LinkedList<String>();
+
+ /**
+ * for init there would really only be 2 values needed: 0 and 1
+ * as there should only ever be 1 TOPOLOGY_INIT event.
+ * But for monitoring reasons it might be interesting to use a
+ * counter here nevertheless and ensure that it never goes above 1.
+ */
+ private final AtomicInteger initEvents = new AtomicInteger(0);
+
+ /**
+ * counts number of TOPOLOGY_CHANGING events. An int should be enough,
+ * if there is 1 event per second this lasts 68 years
+ */
+ private final AtomicInteger changingEvents = new AtomicInteger(0);
+
+ /**
+ * counts number of TOPOLOGY_CHANGED events. An int should be enough,
+ * if there is 1 event per second this lasts 68 years
+ */
+ private final AtomicInteger changedEvents = new AtomicInteger(0);
+
+ /**
+ * counts number of TOPOLOGY_CHANGED events.
+ * With a long if there is 1 event per millisecond it lasts 292471208 years
+ */
+ private final AtomicLong propertyChangedEvents = new AtomicLong(0);
+
+ /**
+ * This is either 0 or 1 - but since the Gauge should be as fast as possible
+ * we maintain an int.
+ * Note that if the topology is undefined the localLeader is also undefined
+ * (but still represents the previous value).
+ * This one starts off as 1 until TOPOLOGY_INIT is received.
+ */
+ private final AtomicInteger topologyIsUndefined = new AtomicInteger(1);
+
+ /**
+ * Keeps track of number of instances in local cluster.
+ * There should really only be a small number, but even so an int is certainly enough.
+ */
+ private final AtomicInteger localClusterInstances = new AtomicInteger(0);
+
+ /**
+ * Counts the number of instances that joined the local cluster, over time.
+ * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied
+ * by number of instances joining per such event. Still, an int sounds more than enough.
+ */
+ private final AtomicInteger localClusterJoins = new AtomicInteger(0);
+
+ /**
+ * Counts the number of instances that left the local cluster, over time.
+ * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied
+ * by number of instances leaving per such event. Still, an int sounds more than enough.
+ */
+ private final AtomicInteger localClusterLeaves = new AtomicInteger(0);
+
+ /**
+ * The number of leader changes should be smaller or equal to
+ * TOPOLOGY_CHANGED events +1.
+ * So an int seems more than enough.
+ * Note that this counts only actual changes, not a leader being announced via TOPOLOGY_INIT.
+ */
+ private final AtomicInteger localClusterLeaderSwitches = new AtomicInteger(0);
+
+ /**
+ * The order of magnitude here is number of properties multiplied by number of instances in the local cluster.
+ * So this is an order of magnitude higher than ownPropertiesCount -
+ * but still, an int should be enough, really.
+ */
+ private final AtomicInteger localClusterProperties = new AtomicInteger(0);
+
+ /**
+ * This is either 0 or 1 - but since the Gauge should be as fast as possible
+ * we maintain an int.
+ * Note that localLeader is only valid if the topology is not changing currently
+ * (otherwise it is undefined).
+ */
+ private final AtomicInteger ownIsLeader = new AtomicInteger(0);
+
+ /**
+ * There shouldn't be an aweful lot of properties, so int sounds more than enough
+ */
+ private final AtomicInteger ownProperties = new AtomicInteger(0);
+
+ /**
+ * Attached/remote clusters aren't probably too many, so again, int is enough
+ */
+ private final AtomicInteger remoteClusters = new AtomicInteger(0);
+
+ /**
+ * Attached/remote instances (sum of instances in remote clusters) -
+ * probably aren't too many, int is enough
+ */
+ private final AtomicInteger remoteInstances = new AtomicInteger(0);
+
+ @Activate
+ protected void activate() {
+ logger.debug("activate: start");
+
+ createGauge(METRICS_NAME_TOPOLOGY_INIT_EVENTS, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return initEvents.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_TOPOLOGY_CHANGING_EVENTS, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return changingEvents.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_TOPOLOGY_CHANGED_EVENTS, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return changedEvents.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_PROPERTY_CHANGED_EVENTS, new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return propertyChangedEvents.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_TOPOLOGY_IS_UNDEFINED, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return topologyIsUndefined.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_LOCAL_CLUSTER_INSTANCES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return localClusterInstances.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_LOCAL_CLUSTER_JOINS, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return localClusterJoins.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_LOCAL_CLUSTER_LEAVES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return localClusterLeaves.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return localClusterLeaderSwitches.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_LOCAL_CLUSTER_PROPERTIES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return localClusterProperties.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_OWN_IS_LEADER, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return ownIsLeader.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_OWN_PROPERTIES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return ownProperties.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_REMOTE_CLUSTERS, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return remoteClusters.get();
+ }
+ });
+
+ createGauge(METRICS_NAME_REMOTE_INSTANCES, new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return remoteInstances.get();
+ }
+ });
+
+ logger.info("activate: done.");
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void createGauge(String gaugeName, Gauge gauge) {
+ logger.debug("createGauge: registering gauge : " + gaugeName);
+ this.metricRegistry.register(gaugeName, gauge);
+ registeredGaugeNameList.add(gaugeName);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ logger.debug("deactivate: deactivating.");
+ unregisterGauges();
+ logger.info("deactivate: done.");
+ }
+
+ private void unregisterGauges() {
+ for (String registeredGaugeName : registeredGaugeNameList) {
+ logger.debug("unregisterGauges : unregistering gauge : " + registeredGaugeName);
+ metricRegistry.remove(registeredGaugeName);
+ }
+ }
+
+ @Override
+ public void handleTopologyEvent(TopologyEvent event) {
+ if (event == null) {
+ // this should not occur
+ return;
+ }
+
+ try {
+ switch (event.getType()) {
+ case TOPOLOGY_INIT: {
+ handleInit(event.getNewView());
+ return;
+ }
+ case TOPOLOGY_CHANGING: {
+ handleChanging(event.getOldView());
+ return;
+ }
+ case TOPOLOGY_CHANGED: {
+ handleChanged(event.getOldView(), event.getNewView());
+ return;
+ }
+ case PROPERTIES_CHANGED: {
+ handlePropertiesChanged(event.getOldView(), event.getNewView());
+ return;
+ }
+ }
+ } catch (Exception e) {
+ // we should not really see any of those, but just in case..:
+ logger.error("handleTopologyEvent: got Exception " + e, e);
+ }
+ }
+
+ private void handleInit(TopologyView newView) {
+ initEvents.incrementAndGet();
+ topologyIsUndefined.set(0);
+
+ updateLocalClusterInstances(null, newView);
+
+ updateProperties(newView);
+ updateOwnIsLeader(newView);
+ updateRemote(newView);
+ }
+
+ private void handleChanging(TopologyView oldView) {
+ changingEvents.incrementAndGet();
+ topologyIsUndefined.set(1);
+ }
+
+ private void handleChanged(TopologyView oldView, TopologyView newView) {
+ changedEvents.incrementAndGet();
+ topologyIsUndefined.set(0);
+
+ updateLocalClusterInstances(oldView, newView);
+
+ updateLeaderSwitch(oldView, newView);
+
+ updateProperties(newView);
+ updateRemote(newView);
+ }
+
+ private void handlePropertiesChanged(TopologyView oldView, TopologyView newView) {
+ propertyChangedEvents.incrementAndGet();
+
+ updateProperties(newView);
+ }
+
+ private void updateLocalClusterInstances(TopologyView oldViewOrNull, TopologyView newView) {
+ final ClusterView newLocalClusterView = newView.getLocalInstance().getClusterView();
+ localClusterInstances.set(newLocalClusterView.getInstances().size());
+
+ if (oldViewOrNull == null) {
+ localClusterJoins.addAndGet(newLocalClusterView.getInstances().size());
+ } else {
+ final ClusterView oldLocalClusterView = oldViewOrNull.getLocalInstance().getClusterView();
+ final InstancesDiff diff = new InstancesDiff(oldLocalClusterView, newLocalClusterView);
+ final Collection<InstanceDescription> added = diff.added().get();
+ final Collection<InstanceDescription> removed = diff.removed().get();
+
+ if (added != null && added.size() > 0) {
+ localClusterJoins.addAndGet(added.size());
+ }
+ if (removed != null && removed.size() > 0) {
+ localClusterLeaves.addAndGet(removed.size());
+ }
+ }
+ }
+
+ private void updateLeaderSwitch(TopologyView oldView, TopologyView newView) {
+ final InstanceDescription oldLeader = oldView.getLocalInstance().getClusterView().getLeader();
+ final InstanceDescription newLeader = newView.getLocalInstance().getClusterView().getLeader();
+ if (!oldLeader.getSlingId().equals(newLeader.getSlingId())) {
+ localClusterLeaderSwitches.incrementAndGet();
+ }
+
+ updateOwnIsLeader(newView);
+ }
+
+ private void updateOwnIsLeader(TopologyView newView) {
+ if (newView.getLocalInstance().isLeader()) {
+ ownIsLeader.set(1);
+ } else {
+ ownIsLeader.set(0);
+ }
+ }
+
+ private void updateProperties(TopologyView newView) {
+ ownProperties.set(newView.getLocalInstance().getProperties().size());
+ final ClusterView localCluster = newView.getLocalInstance().getClusterView();
+ int properties = 0;
+ for (InstanceDescription instance : localCluster.getInstances()) {
+ properties += instance.getProperties().size();
+ }
+ localClusterProperties.set(properties);
+ }
+
+ private void updateRemote(TopologyView newView) {
+ // remoteClusters only counts the remote ones, so we subtract 1 representing our local one
+ remoteClusters.set(newView.getClusterViews().size() - 1);
+ final String localClusterId = newView.getLocalInstance().getClusterView().getId();
+ int instances = 0;
+ for (ClusterView cluster : newView.getClusterViews()) {
+ final String clusterId = cluster.getId();
+ if (!clusterId.equals(localClusterId)) {
+ instances += cluster.getInstances().size();
+ }
+ }
+ remoteInstances.set(instances);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java
new file mode 100644
index 0000000..02f77b7
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.impl.support;
+
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_INSTANCES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_JOINS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEAVES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_PROPERTIES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_IS_LEADER;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_PROPERTIES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_PROPERTY_CHANGED_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_CLUSTERS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_INSTANCES;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGED_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGING_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_INIT_EVENTS;
+import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_IS_UNDEFINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.InstanceFilter;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.base.ViewStateManagerFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class MetricReporterTest {
+
+ static class TestTopologyView extends BaseTopologyView {
+
+ private static final Random propertiesChangerRandom = new Random(54321);
+
+ private final InstanceDescription localInstance;
+
+ private HashSet<ClusterView> clusterViews;
+
+ static Map<String, String> newEmptyProps() {
+ final Map<String, String> result = new HashMap<String, String>();
+ return result;
+ }
+
+ static TestTopologyView singleInstance() {
+ return singleInstance(UUID.randomUUID().toString(), UUID.randomUUID().toString());
+ }
+
+ static TestTopologyView singleInstance(String localSlingId, String localClusterId) {
+ final DefaultClusterView localCluster = new DefaultClusterView(localClusterId);
+ final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps());
+ return new TestTopologyView(localInstance);
+ }
+
+ static BaseTopologyView localClusterWithRandomInstances(int numRandomInstances) {
+ return localClusterWithRandomInstances(UUID.randomUUID().toString(), UUID.randomUUID().toString(), numRandomInstances);
+ }
+
+ static BaseTopologyView localClusterWithRandomInstances(String localSlingId, String clusterId, int numRandomInstances) {
+ final DefaultClusterView localCluster = new DefaultClusterView(clusterId);
+ final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps());
+ for (int i = 0; i < numRandomInstances; i++) {
+ new DefaultInstanceDescription(localCluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+ }
+
+ return new TestTopologyView(localInstance);
+ }
+
+ public static BaseTopologyView withLeaderSwitchedBy(BaseTopologyView view, int offset) {
+ final ClusterView cluster = view.getLocalInstance().getClusterView();
+ int index = 0;
+ for (InstanceDescription instance : cluster.getInstances()) {
+ if (instance.isLeader()) {
+ break;
+ } else {
+ index++;
+ }
+ }
+ int newLeader = (index + offset) % cluster.getInstances().size();
+ final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId());
+ index = 0;
+ InstanceDescription local = null;
+ for (InstanceDescription instance : cluster.getInstances()) {
+ DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, index++ == newLeader, instance.isLocal(),
+ instance.getSlingId(), instance.getProperties());
+ if (instance.isLocal()) {
+ local = aNewInstance;
+ }
+ }
+
+ return new TestTopologyView(local);
+ }
+
+ public static BaseTopologyView withRandomPropertiesAdded(BaseTopologyView view, int offset, int numRandomPropertiesChanged) {
+ final ClusterView cluster = view.getLocalInstance().getClusterView();
+ final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId());
+ int index = 0;
+ InstanceDescription local = null;
+ for (InstanceDescription instance : cluster.getInstances()) {
+ Map<String, String> properties = instance.getProperties();
+ if (index++ == offset) {
+ properties = withRandomPropertiesAdded(properties, numRandomPropertiesChanged);
+ }
+ DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, instance.isLeader(), instance.isLocal(),
+ instance.getSlingId(), properties);
+ if (instance.isLocal()) {
+ local = aNewInstance;
+ }
+ }
+
+ return new TestTopologyView(local);
+ }
+
+ private static Map<String, String> withRandomPropertiesAdded(Map<String, String> properties, int numRandomPropertiesChanged) {
+ final Map<String, String> result = new HashMap<String, String>(properties);
+ for (int i = 0; i < numRandomPropertiesChanged; i++) {
+ final String key = "randomKey-" + propertiesChangerRandom.nextInt();
+ String current = result.put(key, "r");
+ if (current != null) {
+ // then make sure it is different than the current value
+ result.put(key, current + "r");
+ }
+ }
+ return result;
+ }
+
+ public static BaseTopologyView multiCluster(int... instanceCounts) {
+ final Set<ClusterView> clusters = new HashSet<ClusterView>();
+ InstanceDescription localInstance = null;
+ for (int i = 0; i < instanceCounts.length; i++) {
+ final int instanceCount = instanceCounts[i];
+
+ final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+ final InstanceDescription firstInstance = new DefaultInstanceDescription(cluster, i == 0, i == 0, UUID.randomUUID().toString(),
+ newEmptyProps());
+ if (i == 0) {
+ localInstance = firstInstance;
+ }
+ for (int j = 0; j < instanceCount - 1; j++) {
+ new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+ }
+
+ clusters.add(cluster);
+ }
+
+ TestTopologyView view = new TestTopologyView(localInstance);
+ view.setClusterViews(clusters);
+ return view;
+ }
+
+ public static BaseTopologyView addRemote(BaseTopologyView view, int instanceCount) {
+ final String localClusterId = view.getLocalInstance().getClusterView().getId();
+ final Set<ClusterView> newClusters = new HashSet<ClusterView>();
+ final Set<ClusterView> existing = view.getClusterViews();
+ InstanceDescription local = null;
+ for (ClusterView clusterView : existing) {
+ ClusterView clonedCluster = clone(clusterView);
+ if (clusterView.getId().equals(localClusterId)) {
+ for (InstanceDescription inst : clonedCluster.getInstances()) {
+ if (inst.getSlingId().equals(view.getLocalInstance().getSlingId())) {
+ local = inst;
+ }
+ }
+ }
+ newClusters.add(clonedCluster);
+ }
+
+ final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString());
+ for (int j = 0; j < instanceCount; j++) {
+ new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps());
+ }
+ newClusters.add(cluster);
+
+ TestTopologyView newView = new TestTopologyView(local);
+ newView.setClusterViews(newClusters);
+ return newView;
+ }
+
+ private static ClusterView clone(ClusterView original) {
+ final DefaultClusterView cluster = new DefaultClusterView(original.getId());
+ for (InstanceDescription inst : original.getInstances()) {
+ new DefaultInstanceDescription(cluster, inst.isLeader(), inst.isLocal(), inst.getSlingId(), inst.getProperties());
+ }
+ return cluster;
+ }
+
+ private TestTopologyView(InstanceDescription localInstance) {
+ this.localInstance = localInstance;
+ final Set<ClusterView> result = new HashSet<ClusterView>();
+ result.add(localInstance.getClusterView());
+ setClusterViews(result);
+ }
+
+ @Override
+ public InstanceDescription getLocalInstance() {
+ return localInstance;
+ }
+
+ @Override
+ public Set<InstanceDescription> getInstances() {
+ // to achieve some true randomness in the way this list is returned,
+ // we actually use a Random .. not that this matters much, but
+ // would be good to have tests not rely on this
+ final Set<InstanceDescription> result = new HashSet<InstanceDescription>();
+ final Random r = new Random();
+ final List<ClusterView> clusters = new LinkedList<ClusterView>(clusterViews);
+ while (!clusters.isEmpty()) {
+ final ClusterView cluster = clusters.remove(r.nextInt(clusters.size()));
+ for (InstanceDescription instance : cluster.getInstances()) {
+ result.add(instance);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Set<InstanceDescription> findInstances(InstanceFilter filter) {
+ throw new IllegalStateException("not implemented");
+ }
+
+ @Override
+ public Set<ClusterView> getClusterViews() {
+ return clusterViews;
+ }
+
+ private void setClusterViews(Set<ClusterView> clusterViews) {
+ this.clusterViews = new HashSet<ClusterView>(clusterViews);
+ }
+
+ @Override
+ public String getLocalClusterSyncTokenId() {
+ throw new IllegalStateException("not implemented");
+ }
+
+ }
+
+ private MetricReporter reporter;
+ private MetricRegistry metricRegistry;
+ private ViewStateManager viewStateManager;
+ private Lock viewStateManagerLock;
+
+ @Before
+ public void setup() {
+ reporter = new MetricReporter();
+ metricRegistry = new MetricRegistry();
+ reporter.metricRegistry = metricRegistry;
+ viewStateManagerLock = new ReentrantLock();
+ viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, null);
+ viewStateManager.bind(reporter);
+ }
+
+ @Test
+ public void testActivateDeactivate() {
+ for (int i = 0; i < 100; i++) {
+ reporter.activate();
+ reporter.deactivate();
+ }
+ }
+
+ @Test
+ public void testPreInitState() throws InterruptedException {
+ reporter.activate();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+ reporter.deactivate();
+ }
+
+ @Test
+ public void testStandardSequence() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+ viewStateManager.handleChanging();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+ final BaseTopologyView myView = TestTopologyView.singleInstance();
+ viewStateManager.handleNewView(myView);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+ for (int i = 0; i < 10; i++) {
+ viewStateManager.handleChanging();
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, i, i + 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+ viewStateManager.handleNewView(
+ TestTopologyView.singleInstance(myView.getLocalInstance().getSlingId(), myView.getLocalInstance().getClusterView().getId()));
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, i + 1, i + 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+ }
+ }
+
+ @Test
+ public void testRandomJoinsLeaves() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ final BaseTopologyView baseView = TestTopologyView.singleInstance();
+ viewStateManager.handleNewView(baseView);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0);
+
+ final Random r = new Random(12345678);
+ int joinCnt = 1;
+ int leaveCnt = 0;
+ for (int i = 0; i < 10; i++) {
+ int numRandomInstances = r.nextInt(5) + 1;
+ joinCnt += numRandomInstances;
+ viewStateManager.handleNewView(TestTopologyView.localClusterWithRandomInstances(baseView.getLocalInstance().getSlingId(),
+ baseView.getLocalInstance().getClusterView().getId(), numRandomInstances));
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 2 * i + 1, 2 * i + 1, 0, 0, 1 + numRandomInstances, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0);
+
+ // back to only my instance - all newly joined leave again
+ leaveCnt += numRandomInstances;
+ viewStateManager.handleNewView(
+ TestTopologyView.singleInstance(baseView.getLocalInstance().getSlingId(), baseView.getLocalInstance().getClusterView().getId()));
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 2 * i + 2, 2 * i + 2, 0, 0, 1, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0);
+ }
+ }
+
+ @Test
+ public void testLeader() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ final int num = 11;
+ BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0);
+
+ for (int i = 0; i < 10; i++) {
+ int offset = 1;
+ view = TestTopologyView.withLeaderSwitchedBy(view, offset);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, i + 1, i + 1, 0, 0, num, num, 0, i + 1, 0, view.getLocalInstance().isLeader() ? 1 : 0, 0, 0, 0);
+ }
+ }
+
+ @Test
+ public void testInitialAndChangedProperties() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(3);
+ view = TestTopologyView.withRandomPropertiesAdded(view, 0, 4);
+ view = TestTopologyView.withRandomPropertiesAdded(view, 1, 3);
+ view = TestTopologyView.withRandomPropertiesAdded(view, 2, 8);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8, 1, 4, 0, 0);
+
+ view = TestTopologyView.withRandomPropertiesAdded(view, 1, 7);
+ view = TestTopologyView.addRemote(view, 1);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 1, 1, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8 + 7, 1, 4, 1, 1);
+ }
+
+ @Test
+ public void testProperties() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ final int num = 11;
+ BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0);
+
+ final Random r = new Random(100000);
+ for (int i = 0; i < num; i++) {
+ int numRandomPropertiesChanged = r.nextInt(10);
+ view = TestTopologyView.withRandomPropertiesAdded(view, i, numRandomPropertiesChanged);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ int localPropertiesCount = localProperties(view);
+ assertTrue("no properties set", localPropertiesCount > 0);
+ assertGaugesEqual(1, 0, 0, i + 1, 0, num, num, 0, 0, localPropertiesCount, view.getLocalInstance().isLeader() ? 1 : 0,
+ view.getLocalInstance().getProperties().size(), 0, 0);
+ }
+ }
+
+ @Test
+ public void testRemoteCluster() throws InterruptedException {
+ reporter.activate();
+ viewStateManager.handleActivated();
+ assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ BaseTopologyView view = TestTopologyView.multiCluster(3, 2, 4);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 0, 0, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(),
+ view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 2, 6);
+
+ view = TestTopologyView.addRemote(view, 5);
+ viewStateManager.handleNewView(view);
+ viewStateManager.waitForAsyncEvents(5000);
+ assertGaugesEqual(1, 1, 1, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(),
+ view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 3, 11);
+ }
+
+ private int localProperties(BaseTopologyView view) {
+ int properties = 0;
+ for (InstanceDescription instance : view.getLocalInstance().getClusterView().getInstances()) {
+ properties += instance.getProperties().size();
+ }
+ return properties;
+ }
+
+ private void assertGaugesEqual(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances, int localJoins,
+ int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount, int remoteClustersCount,
+ int remoteInstancesCount) throws InterruptedException {
+ long maxWait = System.currentTimeMillis() + 5000;
+ while (maxWait < System.currentTimeMillis()) {
+ try {
+ assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves,
+ localLeaderSwitches, localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount);
+ // successful case
+ return;
+ } catch (Throwable th) {
+ // error case : sleep and retry
+ Thread.sleep(100);
+ }
+ }
+ // timeout case : one last retry, outside try/catch to propagate the failure
+ assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves, localLeaderSwitches,
+ localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount);
+ }
+
+ private void assertGaugesEqualNoWait(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances,
+ int localJoins, int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount,
+ int remoteClustersCount, int remoteInstancesCount) {
+ assertGaugeEquals(init, METRICS_NAME_TOPOLOGY_INIT_EVENTS);
+ assertGaugeEquals(changed, METRICS_NAME_TOPOLOGY_CHANGED_EVENTS);
+ assertGaugeEquals(changing, METRICS_NAME_TOPOLOGY_CHANGING_EVENTS);
+ assertGaugeEquals(propertyChanged, METRICS_NAME_PROPERTY_CHANGED_EVENTS);
+
+ assertGaugeEquals(isUndefined, METRICS_NAME_TOPOLOGY_IS_UNDEFINED);
+
+ assertGaugeEquals(localInstances, METRICS_NAME_LOCAL_CLUSTER_INSTANCES);
+ assertGaugeEquals(localJoins, METRICS_NAME_LOCAL_CLUSTER_JOINS);
+ assertGaugeEquals(localLeaves, METRICS_NAME_LOCAL_CLUSTER_LEAVES);
+ assertGaugeEquals(localLeaderSwitches, METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES);
+ assertGaugeEquals(localPropertiesCount, METRICS_NAME_LOCAL_CLUSTER_PROPERTIES);
+
+ assertGaugeEquals(ownIsLeader, METRICS_NAME_OWN_IS_LEADER);
+ assertGaugeEquals(ownPropertiesCount, METRICS_NAME_OWN_PROPERTIES);
+
+ assertGaugeEquals(remoteClustersCount, METRICS_NAME_REMOTE_CLUSTERS);
+ assertGaugeEquals(remoteInstancesCount, METRICS_NAME_REMOTE_INSTANCES);
+ }
+
+ private void assertGaugeEquals(int expected, String metricsName) {
+ assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue());
+ }
+
+ private void assertGaugeEquals(long expected, String metricsName) {
+ assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue());
+ }
+}