SLING-9589 : expose discovery metrics
diff --git a/pom.xml b/pom.xml index 418a24b..462ef1c 100644 --- a/pom.xml +++ b/pom.xml
@@ -60,6 +60,7 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> + <version>1.6.1</version> </dependency> <dependency> <groupId>org.osgi</groupId> @@ -81,5 +82,23 @@ <version>1.3.0</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.discovery.commons</artifactId> + <version>1.0.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.1.0</version> + <scope>provided</scope> + </dependency> + <!-- Testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project>
diff --git a/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java new file mode 100644 index 0000000..205c786 --- /dev/null +++ b/src/main/java/org/apache/sling/discovery/impl/support/MetricReporter.java
@@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.discovery.impl.support; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.discovery.ClusterView; +import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.discovery.TopologyEvent; +import org.apache.sling.discovery.TopologyEventListener; +import org.apache.sling.discovery.TopologyView; +import org.apache.sling.discovery.commons.InstancesDiff; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; + +/** + * MetricReporter is in charge of listening to TopologyEvents + * (by virtue of being a TopologyEventListener) and exposing + * metrics (via dropwizard). + */ +@Service(value = { TopologyEventListener.class }) +@Component(immediate = true) +public class MetricReporter implements TopologyEventListener { + + // event counters + static final String METRICS_NAME_TOPOLOGY_CHANGING_EVENTS = "discovery.oak.topology.changing.events"; + static final String METRICS_NAME_TOPOLOGY_INIT_EVENTS = "discovery.oak.topology.init.events"; + static final String METRICS_NAME_TOPOLOGY_CHANGED_EVENTS = "discovery.oak.topology.changed.events"; + static final String METRICS_NAME_PROPERTY_CHANGED_EVENTS = "discovery.oak.property.changed.events"; + + static final String METRICS_NAME_TOPOLOGY_IS_UNDEFINED = "discovery.oak.topology.is.undefined"; + + static final String METRICS_NAME_LOCAL_CLUSTER_INSTANCES = "discovery.oak.local.cluster.instances"; + static final String METRICS_NAME_LOCAL_CLUSTER_JOINS = "discovery.oak.local.cluster.joins"; + static final String METRICS_NAME_LOCAL_CLUSTER_LEAVES = "discovery.oak.local.cluster.leaves"; + static final String METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES = "discovery.oak.local.cluster.leader.switches"; + static final String METRICS_NAME_LOCAL_CLUSTER_PROPERTIES = "discovery.oak.local.cluster.properties"; + + static final String METRICS_NAME_OWN_IS_LEADER = "discovery.oak.own.is.leader"; + static final String METRICS_NAME_OWN_PROPERTIES = "discovery.oak.own.properties"; + + static final String METRICS_NAME_REMOTE_CLUSTERS = "discovery.oak.remote.cluster"; + static final String METRICS_NAME_REMOTE_INSTANCES = "discovery.oak.remote.instances"; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Reference(target = "(name=sling)") + MetricRegistry metricRegistry; + + private final List<String> registeredGaugeNameList = new LinkedList<String>(); + + /** + * for init there would really only be 2 values needed: 0 and 1 + * as there should only ever be 1 TOPOLOGY_INIT event. + * But for monitoring reasons it might be interesting to use a + * counter here nevertheless and ensure that it never goes above 1. + */ + private final AtomicInteger initEvents = new AtomicInteger(0); + + /** + * counts number of TOPOLOGY_CHANGING events. An int should be enough, + * if there is 1 event per second this lasts 68 years + */ + private final AtomicInteger changingEvents = new AtomicInteger(0); + + /** + * counts number of TOPOLOGY_CHANGED events. An int should be enough, + * if there is 1 event per second this lasts 68 years + */ + private final AtomicInteger changedEvents = new AtomicInteger(0); + + /** + * counts number of TOPOLOGY_CHANGED events. + * With a long if there is 1 event per millisecond it lasts 292471208 years + */ + private final AtomicLong propertyChangedEvents = new AtomicLong(0); + + /** + * This is either 0 or 1 - but since the Gauge should be as fast as possible + * we maintain an int. + * Note that if the topology is undefined the localLeader is also undefined + * (but still represents the previous value). + * This one starts off as 1 until TOPOLOGY_INIT is received. + */ + private final AtomicInteger topologyIsUndefined = new AtomicInteger(1); + + /** + * Keeps track of number of instances in local cluster. + * There should really only be a small number, but even so an int is certainly enough. + */ + private final AtomicInteger localClusterInstances = new AtomicInteger(0); + + /** + * Counts the number of instances that joined the local cluster, over time. + * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied + * by number of instances joining per such event. Still, an int sounds more than enough. + */ + private final AtomicInteger localClusterJoins = new AtomicInteger(0); + + /** + * Counts the number of instances that left the local cluster, over time. + * The order of magnitude is number of TOPOLOGY_CHANGED events multiplied + * by number of instances leaving per such event. Still, an int sounds more than enough. + */ + private final AtomicInteger localClusterLeaves = new AtomicInteger(0); + + /** + * The number of leader changes should be smaller or equal to + * TOPOLOGY_CHANGED events +1. + * So an int seems more than enough. + * Note that this counts only actual changes, not a leader being announced via TOPOLOGY_INIT. + */ + private final AtomicInteger localClusterLeaderSwitches = new AtomicInteger(0); + + /** + * The order of magnitude here is number of properties multiplied by number of instances in the local cluster. + * So this is an order of magnitude higher than ownPropertiesCount - + * but still, an int should be enough, really. + */ + private final AtomicInteger localClusterProperties = new AtomicInteger(0); + + /** + * This is either 0 or 1 - but since the Gauge should be as fast as possible + * we maintain an int. + * Note that localLeader is only valid if the topology is not changing currently + * (otherwise it is undefined). + */ + private final AtomicInteger ownIsLeader = new AtomicInteger(0); + + /** + * There shouldn't be an aweful lot of properties, so int sounds more than enough + */ + private final AtomicInteger ownProperties = new AtomicInteger(0); + + /** + * Attached/remote clusters aren't probably too many, so again, int is enough + */ + private final AtomicInteger remoteClusters = new AtomicInteger(0); + + /** + * Attached/remote instances (sum of instances in remote clusters) - + * probably aren't too many, int is enough + */ + private final AtomicInteger remoteInstances = new AtomicInteger(0); + + @Activate + protected void activate() { + logger.debug("activate: start"); + + createGauge(METRICS_NAME_TOPOLOGY_INIT_EVENTS, new Gauge<Integer>() { + @Override + public Integer getValue() { + return initEvents.get(); + } + }); + + createGauge(METRICS_NAME_TOPOLOGY_CHANGING_EVENTS, new Gauge<Integer>() { + @Override + public Integer getValue() { + return changingEvents.get(); + } + }); + + createGauge(METRICS_NAME_TOPOLOGY_CHANGED_EVENTS, new Gauge<Integer>() { + @Override + public Integer getValue() { + return changedEvents.get(); + } + }); + + createGauge(METRICS_NAME_PROPERTY_CHANGED_EVENTS, new Gauge<Long>() { + @Override + public Long getValue() { + return propertyChangedEvents.get(); + } + }); + + createGauge(METRICS_NAME_TOPOLOGY_IS_UNDEFINED, new Gauge<Integer>() { + @Override + public Integer getValue() { + return topologyIsUndefined.get(); + } + }); + + createGauge(METRICS_NAME_LOCAL_CLUSTER_INSTANCES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return localClusterInstances.get(); + } + }); + + createGauge(METRICS_NAME_LOCAL_CLUSTER_JOINS, new Gauge<Integer>() { + @Override + public Integer getValue() { + return localClusterJoins.get(); + } + }); + + createGauge(METRICS_NAME_LOCAL_CLUSTER_LEAVES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return localClusterLeaves.get(); + } + }); + + createGauge(METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return localClusterLeaderSwitches.get(); + } + }); + + createGauge(METRICS_NAME_LOCAL_CLUSTER_PROPERTIES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return localClusterProperties.get(); + } + }); + + createGauge(METRICS_NAME_OWN_IS_LEADER, new Gauge<Integer>() { + @Override + public Integer getValue() { + return ownIsLeader.get(); + } + }); + + createGauge(METRICS_NAME_OWN_PROPERTIES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return ownProperties.get(); + } + }); + + createGauge(METRICS_NAME_REMOTE_CLUSTERS, new Gauge<Integer>() { + @Override + public Integer getValue() { + return remoteClusters.get(); + } + }); + + createGauge(METRICS_NAME_REMOTE_INSTANCES, new Gauge<Integer>() { + @Override + public Integer getValue() { + return remoteInstances.get(); + } + }); + + logger.info("activate: done."); + } + + @SuppressWarnings("rawtypes") + private void createGauge(String gaugeName, Gauge gauge) { + logger.debug("createGauge: registering gauge : " + gaugeName); + this.metricRegistry.register(gaugeName, gauge); + registeredGaugeNameList.add(gaugeName); + } + + @Deactivate + protected void deactivate() { + logger.debug("deactivate: deactivating."); + unregisterGauges(); + logger.info("deactivate: done."); + } + + private void unregisterGauges() { + for (String registeredGaugeName : registeredGaugeNameList) { + logger.debug("unregisterGauges : unregistering gauge : " + registeredGaugeName); + metricRegistry.remove(registeredGaugeName); + } + } + + @Override + public void handleTopologyEvent(TopologyEvent event) { + if (event == null) { + // this should not occur + return; + } + + try { + switch (event.getType()) { + case TOPOLOGY_INIT: { + handleInit(event.getNewView()); + return; + } + case TOPOLOGY_CHANGING: { + handleChanging(event.getOldView()); + return; + } + case TOPOLOGY_CHANGED: { + handleChanged(event.getOldView(), event.getNewView()); + return; + } + case PROPERTIES_CHANGED: { + handlePropertiesChanged(event.getOldView(), event.getNewView()); + return; + } + } + } catch (Exception e) { + // we should not really see any of those, but just in case..: + logger.error("handleTopologyEvent: got Exception " + e, e); + } + } + + private void handleInit(TopologyView newView) { + initEvents.incrementAndGet(); + topologyIsUndefined.set(0); + + updateLocalClusterInstances(null, newView); + + updateProperties(newView); + updateOwnIsLeader(newView); + updateRemote(newView); + } + + private void handleChanging(TopologyView oldView) { + changingEvents.incrementAndGet(); + topologyIsUndefined.set(1); + } + + private void handleChanged(TopologyView oldView, TopologyView newView) { + changedEvents.incrementAndGet(); + topologyIsUndefined.set(0); + + updateLocalClusterInstances(oldView, newView); + + updateLeaderSwitch(oldView, newView); + + updateProperties(newView); + updateRemote(newView); + } + + private void handlePropertiesChanged(TopologyView oldView, TopologyView newView) { + propertyChangedEvents.incrementAndGet(); + + updateProperties(newView); + } + + private void updateLocalClusterInstances(TopologyView oldViewOrNull, TopologyView newView) { + final ClusterView newLocalClusterView = newView.getLocalInstance().getClusterView(); + localClusterInstances.set(newLocalClusterView.getInstances().size()); + + if (oldViewOrNull == null) { + localClusterJoins.addAndGet(newLocalClusterView.getInstances().size()); + } else { + final ClusterView oldLocalClusterView = oldViewOrNull.getLocalInstance().getClusterView(); + final InstancesDiff diff = new InstancesDiff(oldLocalClusterView, newLocalClusterView); + final Collection<InstanceDescription> added = diff.added().get(); + final Collection<InstanceDescription> removed = diff.removed().get(); + + if (added != null && added.size() > 0) { + localClusterJoins.addAndGet(added.size()); + } + if (removed != null && removed.size() > 0) { + localClusterLeaves.addAndGet(removed.size()); + } + } + } + + private void updateLeaderSwitch(TopologyView oldView, TopologyView newView) { + final InstanceDescription oldLeader = oldView.getLocalInstance().getClusterView().getLeader(); + final InstanceDescription newLeader = newView.getLocalInstance().getClusterView().getLeader(); + if (!oldLeader.getSlingId().equals(newLeader.getSlingId())) { + localClusterLeaderSwitches.incrementAndGet(); + } + + updateOwnIsLeader(newView); + } + + private void updateOwnIsLeader(TopologyView newView) { + if (newView.getLocalInstance().isLeader()) { + ownIsLeader.set(1); + } else { + ownIsLeader.set(0); + } + } + + private void updateProperties(TopologyView newView) { + ownProperties.set(newView.getLocalInstance().getProperties().size()); + final ClusterView localCluster = newView.getLocalInstance().getClusterView(); + int properties = 0; + for (InstanceDescription instance : localCluster.getInstances()) { + properties += instance.getProperties().size(); + } + localClusterProperties.set(properties); + } + + private void updateRemote(TopologyView newView) { + // remoteClusters only counts the remote ones, so we subtract 1 representing our local one + remoteClusters.set(newView.getClusterViews().size() - 1); + final String localClusterId = newView.getLocalInstance().getClusterView().getId(); + int instances = 0; + for (ClusterView cluster : newView.getClusterViews()) { + final String clusterId = cluster.getId(); + if (!clusterId.equals(localClusterId)) { + instances += cluster.getInstances().size(); + } + } + remoteInstances.set(instances); + } + +}
diff --git a/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java new file mode 100644 index 0000000..02f77b7 --- /dev/null +++ b/src/test/java/org/apache/sling/discovery/impl/support/MetricReporterTest.java
@@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.discovery.impl.support; + +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_INSTANCES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_JOINS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_LEAVES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_LOCAL_CLUSTER_PROPERTIES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_IS_LEADER; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_OWN_PROPERTIES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_PROPERTY_CHANGED_EVENTS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_CLUSTERS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_REMOTE_INSTANCES; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGED_EVENTS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_CHANGING_EVENTS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_INIT_EVENTS; +import static org.apache.sling.discovery.impl.support.MetricReporter.METRICS_NAME_TOPOLOGY_IS_UNDEFINED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.sling.discovery.ClusterView; +import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.discovery.InstanceFilter; +import org.apache.sling.discovery.commons.providers.BaseTopologyView; +import org.apache.sling.discovery.commons.providers.DefaultClusterView; +import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription; +import org.apache.sling.discovery.commons.providers.ViewStateManager; +import org.apache.sling.discovery.commons.providers.base.ViewStateManagerFactory; +import org.junit.Before; +import org.junit.Test; + +import com.codahale.metrics.MetricRegistry; + +public class MetricReporterTest { + + static class TestTopologyView extends BaseTopologyView { + + private static final Random propertiesChangerRandom = new Random(54321); + + private final InstanceDescription localInstance; + + private HashSet<ClusterView> clusterViews; + + static Map<String, String> newEmptyProps() { + final Map<String, String> result = new HashMap<String, String>(); + return result; + } + + static TestTopologyView singleInstance() { + return singleInstance(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + static TestTopologyView singleInstance(String localSlingId, String localClusterId) { + final DefaultClusterView localCluster = new DefaultClusterView(localClusterId); + final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps()); + return new TestTopologyView(localInstance); + } + + static BaseTopologyView localClusterWithRandomInstances(int numRandomInstances) { + return localClusterWithRandomInstances(UUID.randomUUID().toString(), UUID.randomUUID().toString(), numRandomInstances); + } + + static BaseTopologyView localClusterWithRandomInstances(String localSlingId, String clusterId, int numRandomInstances) { + final DefaultClusterView localCluster = new DefaultClusterView(clusterId); + final InstanceDescription localInstance = new DefaultInstanceDescription(localCluster, true, true, localSlingId, newEmptyProps()); + for (int i = 0; i < numRandomInstances; i++) { + new DefaultInstanceDescription(localCluster, false, false, UUID.randomUUID().toString(), newEmptyProps()); + } + + return new TestTopologyView(localInstance); + } + + public static BaseTopologyView withLeaderSwitchedBy(BaseTopologyView view, int offset) { + final ClusterView cluster = view.getLocalInstance().getClusterView(); + int index = 0; + for (InstanceDescription instance : cluster.getInstances()) { + if (instance.isLeader()) { + break; + } else { + index++; + } + } + int newLeader = (index + offset) % cluster.getInstances().size(); + final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId()); + index = 0; + InstanceDescription local = null; + for (InstanceDescription instance : cluster.getInstances()) { + DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, index++ == newLeader, instance.isLocal(), + instance.getSlingId(), instance.getProperties()); + if (instance.isLocal()) { + local = aNewInstance; + } + } + + return new TestTopologyView(local); + } + + public static BaseTopologyView withRandomPropertiesAdded(BaseTopologyView view, int offset, int numRandomPropertiesChanged) { + final ClusterView cluster = view.getLocalInstance().getClusterView(); + final DefaultClusterView localCluster = new DefaultClusterView(cluster.getId()); + int index = 0; + InstanceDescription local = null; + for (InstanceDescription instance : cluster.getInstances()) { + Map<String, String> properties = instance.getProperties(); + if (index++ == offset) { + properties = withRandomPropertiesAdded(properties, numRandomPropertiesChanged); + } + DefaultInstanceDescription aNewInstance = new DefaultInstanceDescription(localCluster, instance.isLeader(), instance.isLocal(), + instance.getSlingId(), properties); + if (instance.isLocal()) { + local = aNewInstance; + } + } + + return new TestTopologyView(local); + } + + private static Map<String, String> withRandomPropertiesAdded(Map<String, String> properties, int numRandomPropertiesChanged) { + final Map<String, String> result = new HashMap<String, String>(properties); + for (int i = 0; i < numRandomPropertiesChanged; i++) { + final String key = "randomKey-" + propertiesChangerRandom.nextInt(); + String current = result.put(key, "r"); + if (current != null) { + // then make sure it is different than the current value + result.put(key, current + "r"); + } + } + return result; + } + + public static BaseTopologyView multiCluster(int... instanceCounts) { + final Set<ClusterView> clusters = new HashSet<ClusterView>(); + InstanceDescription localInstance = null; + for (int i = 0; i < instanceCounts.length; i++) { + final int instanceCount = instanceCounts[i]; + + final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString()); + final InstanceDescription firstInstance = new DefaultInstanceDescription(cluster, i == 0, i == 0, UUID.randomUUID().toString(), + newEmptyProps()); + if (i == 0) { + localInstance = firstInstance; + } + for (int j = 0; j < instanceCount - 1; j++) { + new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps()); + } + + clusters.add(cluster); + } + + TestTopologyView view = new TestTopologyView(localInstance); + view.setClusterViews(clusters); + return view; + } + + public static BaseTopologyView addRemote(BaseTopologyView view, int instanceCount) { + final String localClusterId = view.getLocalInstance().getClusterView().getId(); + final Set<ClusterView> newClusters = new HashSet<ClusterView>(); + final Set<ClusterView> existing = view.getClusterViews(); + InstanceDescription local = null; + for (ClusterView clusterView : existing) { + ClusterView clonedCluster = clone(clusterView); + if (clusterView.getId().equals(localClusterId)) { + for (InstanceDescription inst : clonedCluster.getInstances()) { + if (inst.getSlingId().equals(view.getLocalInstance().getSlingId())) { + local = inst; + } + } + } + newClusters.add(clonedCluster); + } + + final DefaultClusterView cluster = new DefaultClusterView(UUID.randomUUID().toString()); + for (int j = 0; j < instanceCount; j++) { + new DefaultInstanceDescription(cluster, false, false, UUID.randomUUID().toString(), newEmptyProps()); + } + newClusters.add(cluster); + + TestTopologyView newView = new TestTopologyView(local); + newView.setClusterViews(newClusters); + return newView; + } + + private static ClusterView clone(ClusterView original) { + final DefaultClusterView cluster = new DefaultClusterView(original.getId()); + for (InstanceDescription inst : original.getInstances()) { + new DefaultInstanceDescription(cluster, inst.isLeader(), inst.isLocal(), inst.getSlingId(), inst.getProperties()); + } + return cluster; + } + + private TestTopologyView(InstanceDescription localInstance) { + this.localInstance = localInstance; + final Set<ClusterView> result = new HashSet<ClusterView>(); + result.add(localInstance.getClusterView()); + setClusterViews(result); + } + + @Override + public InstanceDescription getLocalInstance() { + return localInstance; + } + + @Override + public Set<InstanceDescription> getInstances() { + // to achieve some true randomness in the way this list is returned, + // we actually use a Random .. not that this matters much, but + // would be good to have tests not rely on this + final Set<InstanceDescription> result = new HashSet<InstanceDescription>(); + final Random r = new Random(); + final List<ClusterView> clusters = new LinkedList<ClusterView>(clusterViews); + while (!clusters.isEmpty()) { + final ClusterView cluster = clusters.remove(r.nextInt(clusters.size())); + for (InstanceDescription instance : cluster.getInstances()) { + result.add(instance); + } + } + return result; + } + + @Override + public Set<InstanceDescription> findInstances(InstanceFilter filter) { + throw new IllegalStateException("not implemented"); + } + + @Override + public Set<ClusterView> getClusterViews() { + return clusterViews; + } + + private void setClusterViews(Set<ClusterView> clusterViews) { + this.clusterViews = new HashSet<ClusterView>(clusterViews); + } + + @Override + public String getLocalClusterSyncTokenId() { + throw new IllegalStateException("not implemented"); + } + + } + + private MetricReporter reporter; + private MetricRegistry metricRegistry; + private ViewStateManager viewStateManager; + private Lock viewStateManagerLock; + + @Before + public void setup() { + reporter = new MetricReporter(); + metricRegistry = new MetricRegistry(); + reporter.metricRegistry = metricRegistry; + viewStateManagerLock = new ReentrantLock(); + viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, null); + viewStateManager.bind(reporter); + } + + @Test + public void testActivateDeactivate() { + for (int i = 0; i < 100; i++) { + reporter.activate(); + reporter.deactivate(); + } + } + + @Test + public void testPreInitState() throws InterruptedException { + reporter.activate(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + reporter.deactivate(); + } + + @Test + public void testStandardSequence() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + viewStateManager.handleChanging(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + final BaseTopologyView myView = TestTopologyView.singleInstance(); + viewStateManager.handleNewView(myView); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0); + for (int i = 0; i < 10; i++) { + viewStateManager.handleChanging(); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, i, i + 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0); + viewStateManager.handleNewView( + TestTopologyView.singleInstance(myView.getLocalInstance().getSlingId(), myView.getLocalInstance().getClusterView().getId())); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, i + 1, i + 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0); + } + } + + @Test + public void testRandomJoinsLeaves() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + final BaseTopologyView baseView = TestTopologyView.singleInstance(); + viewStateManager.handleNewView(baseView); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0); + + final Random r = new Random(12345678); + int joinCnt = 1; + int leaveCnt = 0; + for (int i = 0; i < 10; i++) { + int numRandomInstances = r.nextInt(5) + 1; + joinCnt += numRandomInstances; + viewStateManager.handleNewView(TestTopologyView.localClusterWithRandomInstances(baseView.getLocalInstance().getSlingId(), + baseView.getLocalInstance().getClusterView().getId(), numRandomInstances)); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 2 * i + 1, 2 * i + 1, 0, 0, 1 + numRandomInstances, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0); + + // back to only my instance - all newly joined leave again + leaveCnt += numRandomInstances; + viewStateManager.handleNewView( + TestTopologyView.singleInstance(baseView.getLocalInstance().getSlingId(), baseView.getLocalInstance().getClusterView().getId())); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 2 * i + 2, 2 * i + 2, 0, 0, 1, joinCnt, leaveCnt, 0, 0, 1, 0, 0, 0); + } + } + + @Test + public void testLeader() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + final int num = 11; + BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0); + + for (int i = 0; i < 10; i++) { + int offset = 1; + view = TestTopologyView.withLeaderSwitchedBy(view, offset); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, i + 1, i + 1, 0, 0, num, num, 0, i + 1, 0, view.getLocalInstance().isLeader() ? 1 : 0, 0, 0, 0); + } + } + + @Test + public void testInitialAndChangedProperties() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(3); + view = TestTopologyView.withRandomPropertiesAdded(view, 0, 4); + view = TestTopologyView.withRandomPropertiesAdded(view, 1, 3); + view = TestTopologyView.withRandomPropertiesAdded(view, 2, 8); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8, 1, 4, 0, 0); + + view = TestTopologyView.withRandomPropertiesAdded(view, 1, 7); + view = TestTopologyView.addRemote(view, 1); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 1, 1, 0, 0, 3 + 1, 3 + 1, 0, 0, 4 + 3 + 8 + 7, 1, 4, 1, 1); + } + + @Test + public void testProperties() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + final int num = 11; + BaseTopologyView view = TestTopologyView.localClusterWithRandomInstances(num - 1); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, num, num, 0, 0, 0, 1, 0, 0, 0); + + final Random r = new Random(100000); + for (int i = 0; i < num; i++) { + int numRandomPropertiesChanged = r.nextInt(10); + view = TestTopologyView.withRandomPropertiesAdded(view, i, numRandomPropertiesChanged); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + int localPropertiesCount = localProperties(view); + assertTrue("no properties set", localPropertiesCount > 0); + assertGaugesEqual(1, 0, 0, i + 1, 0, num, num, 0, 0, localPropertiesCount, view.getLocalInstance().isLeader() ? 1 : 0, + view.getLocalInstance().getProperties().size(), 0, 0); + } + } + + @Test + public void testRemoteCluster() throws InterruptedException { + reporter.activate(); + viewStateManager.handleActivated(); + assertGaugesEqual(0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + BaseTopologyView view = TestTopologyView.multiCluster(3, 2, 4); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 0, 0, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(), + view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 2, 6); + + view = TestTopologyView.addRemote(view, 5); + viewStateManager.handleNewView(view); + viewStateManager.waitForAsyncEvents(5000); + assertGaugesEqual(1, 1, 1, 0, 0, view.getLocalInstance().getClusterView().getInstances().size(), + view.getLocalInstance().getClusterView().getInstances().size(), 0, 0, 0, 1, 0, 3, 11); + } + + private int localProperties(BaseTopologyView view) { + int properties = 0; + for (InstanceDescription instance : view.getLocalInstance().getClusterView().getInstances()) { + properties += instance.getProperties().size(); + } + return properties; + } + + private void assertGaugesEqual(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances, int localJoins, + int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount, int remoteClustersCount, + int remoteInstancesCount) throws InterruptedException { + long maxWait = System.currentTimeMillis() + 5000; + while (maxWait < System.currentTimeMillis()) { + try { + assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves, + localLeaderSwitches, localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount); + // successful case + return; + } catch (Throwable th) { + // error case : sleep and retry + Thread.sleep(100); + } + } + // timeout case : one last retry, outside try/catch to propagate the failure + assertGaugesEqualNoWait(init, changed, changing, propertyChanged, isUndefined, localInstances, localJoins, localLeaves, localLeaderSwitches, + localPropertiesCount, ownIsLeader, ownPropertiesCount, remoteClustersCount, remoteInstancesCount); + } + + private void assertGaugesEqualNoWait(int init, int changed, int changing, long propertyChanged, int isUndefined, int localInstances, + int localJoins, int localLeaves, int localLeaderSwitches, int localPropertiesCount, int ownIsLeader, int ownPropertiesCount, + int remoteClustersCount, int remoteInstancesCount) { + assertGaugeEquals(init, METRICS_NAME_TOPOLOGY_INIT_EVENTS); + assertGaugeEquals(changed, METRICS_NAME_TOPOLOGY_CHANGED_EVENTS); + assertGaugeEquals(changing, METRICS_NAME_TOPOLOGY_CHANGING_EVENTS); + assertGaugeEquals(propertyChanged, METRICS_NAME_PROPERTY_CHANGED_EVENTS); + + assertGaugeEquals(isUndefined, METRICS_NAME_TOPOLOGY_IS_UNDEFINED); + + assertGaugeEquals(localInstances, METRICS_NAME_LOCAL_CLUSTER_INSTANCES); + assertGaugeEquals(localJoins, METRICS_NAME_LOCAL_CLUSTER_JOINS); + assertGaugeEquals(localLeaves, METRICS_NAME_LOCAL_CLUSTER_LEAVES); + assertGaugeEquals(localLeaderSwitches, METRICS_NAME_LOCAL_CLUSTER_LEADER_SWITCHES); + assertGaugeEquals(localPropertiesCount, METRICS_NAME_LOCAL_CLUSTER_PROPERTIES); + + assertGaugeEquals(ownIsLeader, METRICS_NAME_OWN_IS_LEADER); + assertGaugeEquals(ownPropertiesCount, METRICS_NAME_OWN_PROPERTIES); + + assertGaugeEquals(remoteClustersCount, METRICS_NAME_REMOTE_CLUSTERS); + assertGaugeEquals(remoteInstancesCount, METRICS_NAME_REMOTE_INSTANCES); + } + + private void assertGaugeEquals(int expected, String metricsName) { + assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue()); + } + + private void assertGaugeEquals(long expected, String metricsName) { + assertEquals(metricsName, expected, metricRegistry.getGauges().get(metricsName).getValue()); + } +}