| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.discovery.base.its; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Semaphore; |
| |
| import org.apache.log4j.Level; |
| import org.apache.log4j.LogManager; |
| import org.apache.sling.discovery.ClusterView; |
| import org.apache.sling.discovery.InstanceDescription; |
| import org.apache.sling.discovery.TopologyEvent; |
| import org.apache.sling.discovery.TopologyEvent.Type; |
| import org.apache.sling.discovery.TopologyEventListener; |
| import org.apache.sling.discovery.TopologyView; |
| import org.apache.sling.discovery.base.commons.ClusterViewHelper; |
| import org.apache.sling.discovery.base.commons.ClusterViewService; |
| import org.apache.sling.discovery.base.commons.UndefinedClusterViewException; |
| import org.apache.sling.discovery.base.connectors.announcement.Announcement; |
| import org.apache.sling.discovery.base.connectors.announcement.AnnouncementFilter; |
| import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry; |
| import org.apache.sling.discovery.base.its.setup.VirtualInstance; |
| import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder; |
| import org.apache.sling.discovery.base.its.setup.mock.AcceptsMultiple; |
| import org.apache.sling.discovery.base.its.setup.mock.AssertingTopologyEventListener; |
| import org.apache.sling.discovery.base.its.setup.mock.PropertyProviderImpl; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class AbstractClusterTest { |
| |
| private final Logger logger = LoggerFactory.getLogger(this.getClass()); |
| |
| private class SimpleClusterView { |
| |
| private VirtualInstance[] instances; |
| |
| SimpleClusterView(VirtualInstance... instances) { |
| this.instances = instances; |
| } |
| |
| @Override |
| public String toString() { |
| String instanceSlingIds = ""; |
| for(int i=0; i<instances.length; i++) { |
| instanceSlingIds = instanceSlingIds + instances[i].slingId + ","; |
| } |
| return "an expected cluster with "+instances.length+" instances: "+instanceSlingIds; |
| } |
| } |
| |
| VirtualInstance instance1; |
| VirtualInstance instance2; |
| VirtualInstance instance3; |
| |
| private String property1Value; |
| |
| protected String property2Value; |
| |
| private String property1Name; |
| |
| private String property2Name; |
| VirtualInstance instance4; |
| VirtualInstance instance5; |
| VirtualInstance instance1Restarted; |
| private Level logLevel; |
| |
| protected abstract VirtualInstanceBuilder newBuilder(); |
| |
| @Before |
| public void setup() throws Exception { |
| final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery"); |
| logLevel = discoveryLogger.getLevel(); |
| discoveryLogger.setLevel(Level.TRACE); |
| logger.debug("here we are"); |
| instance1 = newBuilder().setDebugName("firstInstance").newRepository("/var/discovery/impl/", true).build(); |
| instance2 = newBuilder().setDebugName("secondInstance").useRepositoryOf(instance1).build(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (instance5 != null) { |
| instance5.stop(); |
| } |
| if (instance4 != null) { |
| instance4.stop(); |
| } |
| if (instance3 != null) { |
| instance3.stop(); |
| } |
| if (instance3 != null) { |
| instance3.stop(); |
| } |
| if (instance2 != null) { |
| instance2.stop(); |
| } |
| if (instance1 != null) { |
| instance1.stop(); |
| } |
| if (instance1Restarted != null) { |
| instance1Restarted.stop(); |
| } |
| instance1Restarted = null; |
| instance1 = null; |
| instance2 = null; |
| instance3 = null; |
| instance4 = null; |
| instance5 = null; |
| final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery"); |
| discoveryLogger.setLevel(logLevel); |
| } |
| |
| /** test leader behaviour with ascending slingIds, SLING-3253 **/ |
| @Test |
| public void testLeaderAsc() throws Throwable { |
| logger.info("testLeaderAsc: start"); |
| doTestLeader("000", "111"); |
| logger.info("testLeaderAsc: end"); |
| } |
| |
| /** test leader behaviour with descending slingIds, SLING-3253 **/ |
| @Test |
| public void testLeaderDesc() throws Throwable { |
| logger.info("testLeaderDesc: start"); |
| doTestLeader("111", "000"); |
| logger.info("testLeaderDesc: end"); |
| } |
| |
| private void doTestLeader(String slingId1, String slingId2) throws Throwable { |
| logger.info("doTestLeader("+slingId1+","+slingId2+"): start"); |
| // stop 1 and 2 and create them with a lower heartbeat timeout |
| instance2.stopViewChecker(); |
| instance1.stopViewChecker(); |
| instance2.stop(); |
| instance1.stop(); |
| instance1 = newBuilder().setDebugName("firstInstance") |
| .newRepository("/var/discovery/impl/", true) |
| .setConnectorPingTimeout(30) |
| .setMinEventDelay(1) |
| .setSlingId(slingId1).build(); |
| // sleep so that the two dont have the same startup time, and thus leaderElectionId is lower for instance1 |
| logger.info("doTestLeader: 1st sleep 200ms"); |
| Thread.sleep(200); |
| instance2 = newBuilder().setDebugName("secondInstance") |
| .useRepositoryOf(instance1) |
| .setConnectorPingTimeout(30) |
| .setMinEventDelay(1) |
| .setSlingId(slingId2).build(); |
| assertNotNull(instance1); |
| assertNotNull(instance2); |
| |
| // the two instances are still isolated - hence they throw an exception |
| try{ |
| instance1.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| try{ |
| instance2.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| // let the sync/voting happen |
| for(int m=0; m<4; m++) { |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| logger.info("doTestLeader: sleep 500ms"); |
| Thread.sleep(500); |
| } |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| |
| // now they must be in the same cluster, so in a cluster of size 1 |
| assertEquals(2, instance1.getClusterViewService().getLocalClusterView().getInstances().size()); |
| assertEquals(2, instance2.getClusterViewService().getLocalClusterView().getInstances().size()); |
| |
| // the first instance should be the leader - since it was started first |
| assertTrue(instance1.getLocalInstanceDescription().isLeader()); |
| assertFalse(instance2.getLocalInstanceDescription().isLeader()); |
| logger.info("doTestLeader("+slingId1+","+slingId2+"): end"); |
| } |
| |
| /** |
| * Tests stale announcement reported in SLING-4139: |
| * An instance which crashes but had announcements, never cleans up those announcements. |
| * Thus on a restart, those announcements are still there, even if the connector |
| * would no longer be in use (or point somewhere else etc). |
| * That has various effects, one of them tested in this method: peers in the same cluster, |
| * after the crashed/stopped instance restarts, will assume those stale announcements |
| * as being correct and include them in the topology - hence reporting stale instances |
| * (which can be old instances or even duplicates). |
| */ |
| @Test |
| public void testStaleAnnouncementsVisibleToClusterPeers4139() throws Throwable { |
| logger.info("testStaleAnnouncementsVisibleToClusterPeers4139: start"); |
| final String instance1SlingId = prepare4139(); |
| |
| // remove topology connector from instance3 to instance1 |
| // -> corresponds to stop pinging |
| // (nothing to assert additionally here) |
| |
| // start instance 1 |
| instance1Restarted = newBuilder().setDebugName("firstInstance") |
| .useRepositoryOf(instance2) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1) |
| .setSlingId(instance1SlingId).build(); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| |
| // facts: connector 3->1 does not exist actively anymore, |
| // instance 1+2 should build a cluster, |
| // instance 3 should be isolated |
| logger.info("instance1Restarted.dump: "+instance1Restarted.slingId); |
| instance1Restarted.dumpRepo(); |
| |
| logger.info("instance2.dump: "+instance2.slingId); |
| instance2.dumpRepo(); |
| |
| logger.info("instance3.dump: "+instance3.slingId); |
| instance3.dumpRepo(); |
| |
| assertTopology(instance1Restarted, new SimpleClusterView(instance1Restarted, instance2)); |
| assertTopology(instance3, new SimpleClusterView(instance3)); |
| assertTopology(instance2, new SimpleClusterView(instance1Restarted, instance2)); |
| instance1Restarted.stop(); |
| logger.info("testStaleAnnouncementsVisibleToClusterPeers4139: end"); |
| } |
| |
| /** |
| * Tests a situation where a connector was done to instance1, which eventually |
| * crashed, then the connector is done to instance2. Meanwhile instance1 |
| * got restarted and this test assures that the instance3 is not reported |
| * twice in the topology. Did not happen before 4139, but should never afterwards neither |
| */ |
| @Test |
| public void testDuplicateInstanceIn2Clusters4139() throws Throwable { |
| logger.info("testDuplicateInstanceIn2Clusters4139: start"); |
| final String instance1SlingId = prepare4139(); |
| |
| // remove topology connector from instance3 to instance1 |
| // -> corresponds to stop pinging |
| // (nothing to assert additionally here) |
| // instead, now start a connector from instance3 to instance2 |
| pingConnector(instance3, instance2); |
| |
| // start instance 1 |
| instance1Restarted = newBuilder().setDebugName("firstInstance") |
| .useRepositoryOf(instance2) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1) |
| .setSlingId(instance1SlingId).build(); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| pingConnector(instance3, instance2); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| pingConnector(instance3, instance2); |
| logger.info("iteration 0"); |
| logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId); |
| logger.info("instance2.slingId: "+instance2.slingId); |
| logger.info("instance3.slingId: "+instance3.slingId); |
| instance1Restarted.dumpRepo(); |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), new SimpleClusterView(instance3)); |
| |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| pingConnector(instance3, instance2); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3); |
| pingConnector(instance3, instance2); |
| logger.info("iteration 1"); |
| logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId); |
| logger.info("instance2.slingId: "+instance2.slingId); |
| logger.info("instance3.slingId: "+instance3.slingId); |
| instance1Restarted.dumpRepo(); |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), new SimpleClusterView(instance3)); |
| instance1Restarted.stop(); |
| |
| logger.info("testDuplicateInstanceIn2Clusters4139: end"); |
| } |
| |
| /* ok, this test should do the following: |
| * cluster A with instance 1 and instance 2 |
| * cluster B with instance 3 and instance 4 |
| * cluster C with instance 5 |
| |
| * initially, instance3 is pinging instance1, and instance 5 is pinging instance1 as well (MAC hub) |
| * that should result in instance3 and 5 to inherit the rest from instance1 |
| * then simulate load balancer switching from instance1 to instance2 - hence pings go to instance2 |
| * |
| */ |
| @Test |
| public void testConnectorSwitching4139() throws Throwable { |
| final int MIN_EVENT_DELAY = 1; |
| |
| tearDown(); // reset any setup that was done - we start with a different setup than the default one |
| final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery"); |
| logLevel = discoveryLogger.getLevel(); |
| discoveryLogger.setLevel(Level.DEBUG); |
| |
| instance1 = newBuilder().setDebugName("instance1") |
| .newRepository("/var/discovery/clusterA/", true) |
| .setConnectorPingTimeout(10 /* sec */) |
| .setConnectorPingInterval(999) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| instance2 = newBuilder().setDebugName("instance2") |
| .useRepositoryOf(instance1) |
| .setConnectorPingTimeout(10 /* sec */) |
| .setConnectorPingInterval(999) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| // now launch the remote instance |
| instance3 = newBuilder().setDebugName("instance3") |
| .newRepository("/var/discovery/clusterB/", false) |
| .setConnectorPingTimeout(10 /* sec */) |
| .setConnectorPingInterval(999) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| instance4 = newBuilder().setDebugName("instance4") |
| .useRepositoryOf(instance3) |
| .setConnectorPingTimeout(10 /* sec */) |
| .setConnectorPingInterval(999) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| instance5 = newBuilder().setDebugName("instance5") |
| .newRepository("/var/discovery/clusterC/", false) |
| .setConnectorPingTimeout(10 /* sec */) |
| .setConnectorPingInterval(999) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| |
| // join the instances to form a cluster by sending out heartbeats |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5); |
| Thread.sleep(500); |
| |
| assertSameTopology(new SimpleClusterView(instance1, instance2)); |
| assertSameTopology(new SimpleClusterView(instance3, instance4)); |
| assertSameTopology(new SimpleClusterView(instance5)); |
| |
| // create a topology connector from instance3 to instance1 |
| // -> corresponds to starting to ping |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5); |
| pingConnector(instance3, instance1); |
| pingConnector(instance5, instance1); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5); |
| pingConnector(instance3, instance1); |
| pingConnector(instance5, instance1); |
| Thread.sleep(500); |
| |
| // make asserts on the topology |
| logger.info("testConnectorSwitching4139: instance1.slingId="+instance1.slingId); |
| logger.info("testConnectorSwitching4139: instance2.slingId="+instance2.slingId); |
| logger.info("testConnectorSwitching4139: instance3.slingId="+instance3.slingId); |
| logger.info("testConnectorSwitching4139: instance4.slingId="+instance4.slingId); |
| logger.info("testConnectorSwitching4139: instance5.slingId="+instance5.slingId); |
| instance1.dumpRepo(); |
| |
| assertSameTopology(new SimpleClusterView(instance1, instance2), |
| new SimpleClusterView(instance3, instance4), |
| new SimpleClusterView(instance5)); |
| |
| // simulate a crash of instance1, resulting in load-balancer to switch the pings |
| boolean success = false; |
| for(int i=0; i<25; i++) { |
| // loop for max 25 times, min 20 times |
| runHeartbeatOnceWith(instance2, instance3, instance4, instance5); |
| final boolean ping1 = pingConnector(instance3, instance2); |
| final boolean ping2 = pingConnector(instance5, instance2); |
| if (ping1 && ping2) { |
| // both pings were fine - hence break |
| success = true; |
| logger.info("testConnectorSwitching4139: successfully switched all pings to instance2 after "+i+" rounds."); |
| if (i<20) { |
| logger.info("testConnectorSwitching4139: min loop cnt not yet reached: i="+i); |
| Thread.sleep(1000); // 20x1000ms = 20sec max - (vs 10sec timeout) - should be enough for timing out |
| continue; |
| } |
| break; |
| } |
| logger.info("testConnectorSwitching4139: looping cos ping1="+ping1+", ping2="+ping2); |
| Thread.sleep(1000); // 25x1000ms = 25sec max - (vs 10sec timeout) |
| |
| } |
| assertTrue(success); |
| // one final heartbeat |
| runHeartbeatOnceWith(instance2, instance3, instance4, instance5); |
| assertTrue(pingConnector(instance3, instance2)); |
| assertTrue(pingConnector(instance5, instance2)); |
| |
| instance2.dumpRepo(); |
| |
| assertSameTopology(new SimpleClusterView(instance2), |
| new SimpleClusterView(instance3, instance4), |
| new SimpleClusterView(instance5)); |
| |
| // restart instance1, crash instance4 |
| instance4.stopViewChecker(); |
| instance1Restarted = newBuilder().setDebugName("instance1") |
| .useRepositoryOf(instance2) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1) |
| .setSlingId(instance1.getSlingId()).build(); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance5); |
| // give these heartbeats/votes some time .. so sleep 2sec (timeout is 10sec, so should be safe) |
| Thread.sleep(2000); |
| assertTrue(pingConnector(instance3, instance2)); |
| assertTrue(pingConnector(instance5, instance2)); |
| success = false; |
| for(int i=0; i<40; i++) { |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance5); |
| instance1.getViewChecker().checkView(); |
| // we used to do: |
| //assertTrue(pingConnector(instance3, instance2)); |
| // but that could fail with the introduction of marking |
| // an establishedView as failing upon detecting a view change |
| // when the view changes, we're sending TOPOLOGY_CHANGING to listeners |
| // so getTopology() should also return isCurrent==false - which |
| // means that doing a ping will also fail, cos that wants to |
| // get a current topology to send as an announcement. |
| // which is a long way of saying: we cannot do an assert here |
| // since instance3 *can* have an undefined cluster view.. |
| try{ |
| pingConnector(instance3, instance2); |
| } catch(UndefinedClusterViewException ucve) { |
| // ignore |
| } |
| pingConnector(instance5, instance2); |
| final TopologyView topology = instance3.getDiscoveryService().getTopology(); |
| InstanceDescription i3 = null; |
| for (Iterator<InstanceDescription> it = topology.getInstances().iterator(); it.hasNext();) { |
| final InstanceDescription id = it.next(); |
| if (id.getSlingId().equals(instance3.slingId)) { |
| i3 = id; |
| break; |
| } |
| } |
| assertNotNull(i3); |
| assertEquals(instance3.slingId, i3.getSlingId()); |
| final ClusterView i3Cluster = i3.getClusterView(); |
| final int i3ClusterSize = i3Cluster.getInstances().size(); |
| if (i3ClusterSize==1) { |
| if (i<30) { |
| logger.info("testConnectorSwitching4139: [2] min loop cnt not yet reached: i="+i); |
| Thread.sleep(500); // 30x500ms = 15sec max - (vs 10sec-2sec[sleep] timeout) - should be enough for timing out |
| continue; |
| } |
| success = true; |
| logger.info("testConnectorSwitching4139: i3ClusterSize: "+i3ClusterSize+", i="+i+" (success)"); |
| break; |
| } |
| logger.info("testConnectorSwitching4139: i3ClusterSize: "+i3ClusterSize+", i="+i); |
| Thread.sleep(500); |
| } |
| |
| logger.info("testConnectorSwitching4139: instance1Restarted.slingId="+instance1Restarted.slingId); |
| logger.info("testConnectorSwitching4139: instance2.slingId="+instance2.slingId); |
| logger.info("testConnectorSwitching4139: instance3.slingId="+instance3.slingId); |
| logger.info("testConnectorSwitching4139: instance4.slingId="+instance4.slingId); |
| logger.info("testConnectorSwitching4139: instance5.slingId="+instance5.slingId); |
| instance1Restarted.dumpRepo(); |
| assertTrue(success); |
| |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), |
| new SimpleClusterView(instance3), |
| new SimpleClusterView(instance5)); |
| instance1Restarted.stop(); |
| |
| } |
| |
| @Test |
| public void testDuplicateInstance3726() throws Throwable { |
| logger.info("testDuplicateInstance3726: start"); |
| final int MIN_EVENT_DELAY = 1; |
| |
| tearDown(); // reset any setup that was done - we start with a different setup than the default one |
| final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery"); |
| logLevel = discoveryLogger.getLevel(); |
| discoveryLogger.setLevel(Level.DEBUG); |
| |
| instance1 = newBuilder().setDebugName("instance1") |
| .newRepository("/var/discovery/clusterA/", true) |
| .setConnectorPingTimeout(15 /* sec */) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| instance2 = newBuilder().setDebugName("instance2") |
| .useRepositoryOf(instance1) |
| .setConnectorPingTimeout(15 /* sec */) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| // now launch the remote instance |
| instance3 = newBuilder().setDebugName("instance3") |
| .newRepository("/var/discovery/clusterB/", false) |
| .setConnectorPingTimeout(15 /* sec */) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| instance5 = newBuilder().setDebugName("instance5") |
| .newRepository("/var/discovery/clusterC/", false) |
| .setConnectorPingTimeout(15 /* sec */) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| |
| // join the instances to form a cluster by sending out heartbeats |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance5); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance5); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance1, instance2, instance3, instance5); |
| Thread.sleep(500); |
| |
| assertSameTopology(new SimpleClusterView(instance1, instance2)); |
| assertSameTopology(new SimpleClusterView(instance3)); |
| assertSameTopology(new SimpleClusterView(instance5)); |
| |
| // create a topology connector from instance3 to instance1 |
| // -> corresponds to starting to ping |
| pingConnector(instance3, instance1); |
| pingConnector(instance5, instance1); |
| pingConnector(instance3, instance1); |
| pingConnector(instance5, instance1); |
| |
| // make asserts on the topology |
| logger.info("testDuplicateInstance3726: instance1.slingId="+instance1.slingId); |
| logger.info("testDuplicateInstance3726: instance2.slingId="+instance2.slingId); |
| logger.info("testDuplicateInstance3726: instance3.slingId="+instance3.slingId); |
| logger.info("testDuplicateInstance3726: instance5.slingId="+instance5.slingId); |
| instance1.dumpRepo(); |
| |
| assertSameTopology(new SimpleClusterView(instance1, instance2), |
| new SimpleClusterView(instance3/*, instance4*/), |
| new SimpleClusterView(instance5)); |
| |
| // simulate a crash of instance1, resulting in load-balancer to switch the pings |
| instance1.stopViewChecker(); |
| boolean success = false; |
| for(int i=0; i<25; i++) { |
| // loop for max 25 times, min 20 times |
| runHeartbeatOnceWith(instance2, instance3, /*instance4, */instance5); |
| final boolean ping1 = pingConnector(instance3, instance2); |
| final boolean ping2 = pingConnector(instance5, instance2); |
| if (ping1 && ping2) { |
| // both pings were fine - hence break |
| success = true; |
| logger.info("testDuplicateInstance3726: successfully switched all pings to instance2 after "+i+" rounds."); |
| if (i<20) { |
| logger.info("testDuplicateInstance3726: min loop cnt not yet reached: i="+i); |
| Thread.sleep(1000); // 20x1000ms = 20sec max - (vs 15sec timeout) - should be enough for timing out |
| continue; |
| } |
| break; |
| } |
| logger.info("testDuplicateInstance3726: looping"); |
| Thread.sleep(1000); // 25x1000ms = 25sec max - (vs 15sec timeout) |
| |
| } |
| assertTrue(success); |
| // one final heartbeat |
| runHeartbeatOnceWith(instance2, instance3, instance5); |
| assertTrue(pingConnector(instance3, instance2)); |
| assertTrue(pingConnector(instance5, instance2)); |
| |
| instance2.dumpRepo(); |
| |
| assertSameTopology(new SimpleClusterView(instance2), |
| new SimpleClusterView(instance3), |
| new SimpleClusterView(instance5)); |
| |
| // restart instance1, start instance4 |
| instance1Restarted = newBuilder().setDebugName("instance1") |
| .useRepositoryOf(instance2) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1) |
| .setSlingId(instance1.getSlingId()).build(); |
| instance4 = newBuilder().setDebugName("instance4") |
| .useRepositoryOf(instance3) |
| .setConnectorPingTimeout(30 /* sec */) |
| .setMinEventDelay(MIN_EVENT_DELAY).build(); |
| for(int i=0; i<3; i++) { |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4, instance5); |
| Thread.sleep(250); |
| // since instance4 just started - hooked to instance3 |
| // it is possible that it doesn't just have a topology |
| // yet - so we cannot do: |
| //assertTrue(pingConnector(instance3, instance2)); |
| // but instead do |
| try{ |
| pingConnector(instance3, instance2); |
| } catch(UndefinedClusterViewException ucve) { |
| // ignore |
| } |
| assertTrue(pingConnector(instance5, instance2)); |
| } |
| |
| instance1Restarted.dumpRepo(); |
| logger.info("testDuplicateInstance3726: instance1Restarted.slingId="+instance1Restarted.slingId); |
| logger.info("testDuplicateInstance3726: instance2.slingId="+instance2.slingId); |
| logger.info("testDuplicateInstance3726: instance3.slingId="+instance3.slingId); |
| logger.info("testDuplicateInstance3726: instance4.slingId="+instance4.slingId); |
| logger.info("testDuplicateInstance3726: instance5.slingId="+instance5.slingId); |
| assertTrue(success); |
| |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), |
| new SimpleClusterView(instance3, instance4), |
| new SimpleClusterView(instance5)); |
| instance1Restarted.stop(); |
| logger.info("testDuplicateInstance3726: end"); |
| } |
| |
| private void assertSameTopology(SimpleClusterView... clusters) throws UndefinedClusterViewException { |
| if (clusters==null) { |
| return; |
| } |
| for(int i=0; i<clusters.length; i++) { // go through all clusters |
| final SimpleClusterView aCluster = clusters[i]; |
| assertSameClusterIds(aCluster.instances); |
| for(int j=0; j<aCluster.instances.length; j++) { // and all instances therein |
| final VirtualInstance anInstance = aCluster.instances[j]; |
| assertTopology(anInstance, clusters); // an verify that they all see the same |
| for(int k=0; k<clusters.length; k++) { |
| final SimpleClusterView otherCluster = clusters[k]; |
| if (aCluster==otherCluster) { |
| continue; // then ignore this one |
| } |
| for(int m=0; m<otherCluster.instances.length; m++) { |
| assertNotSameClusterIds(anInstance, otherCluster.instances[m]); |
| } |
| } |
| } |
| } |
| } |
| |
| private void runHeartbeatOnceWith(VirtualInstance... instances) { |
| if (instances==null) { |
| return; |
| } |
| for(int i=0; i<instances.length; i++) { |
| instances[i].heartbeatsAndCheckView(); |
| } |
| } |
| |
| /** |
| * Tests a situation where a connector was done to instance1, which eventually |
| * crashed, then the connector is done to instance4 (which is in a separate, 3rd cluster). |
| * Meanwhile instance1 got restarted and this test assures that the instance3 is not reported |
| * twice in the topology. This used to happen prior to SLING-4139 |
| */ |
| @Test |
| public void testStaleInstanceIn3Clusters4139() throws Throwable { |
| logger.info("testStaleInstanceIn3Clusters4139: start"); |
| final String instance1SlingId = prepare4139(); |
| |
| // remove topology connector from instance3 to instance1 |
| // -> corresponds to stop pinging |
| // (nothing to assert additionally here) |
| |
| // start instance4 in a separate cluster |
| instance4 = newBuilder().setDebugName("remoteInstance4") |
| .newRepository("/var/discovery/implremote4/", false) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1).build(); |
| try{ |
| instance4.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| // instead, now start a connector from instance3 to instance2 |
| instance4.heartbeatsAndCheckView(); |
| instance4.heartbeatsAndCheckView(); |
| pingConnector(instance3, instance4); |
| |
| // start instance 1 |
| instance1Restarted = newBuilder().setDebugName("firstInstance") |
| .useRepositoryOf(instance2) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1) |
| .setSlingId(instance1SlingId).build(); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4); |
| pingConnector(instance3, instance4); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4); |
| pingConnector(instance3, instance4); |
| logger.info("iteration 0"); |
| logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId); |
| logger.info("instance2.slingId: "+instance2.slingId); |
| logger.info("instance3.slingId: "+instance3.slingId); |
| logger.info("instance4.slingId: "+instance4.slingId); |
| instance1Restarted.dumpRepo(); |
| assertSameTopology( |
| new SimpleClusterView(instance3), |
| new SimpleClusterView(instance4)); |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2)); |
| |
| Thread.sleep(100); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4); |
| pingConnector(instance3, instance4); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4); |
| pingConnector(instance3, instance4); |
| logger.info("iteration 1"); |
| logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId); |
| logger.info("instance2.slingId: "+instance2.slingId); |
| logger.info("instance3.slingId: "+instance3.slingId); |
| logger.info("instance4.slingId: "+instance4.slingId); |
| instance1Restarted.dumpRepo(); |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2)); |
| assertSameTopology( |
| new SimpleClusterView(instance3), |
| new SimpleClusterView(instance4)); |
| |
| Thread.sleep(100); |
| runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4); |
| pingConnector(instance3, instance4); |
| |
| // now the situation should be as follows: |
| logger.info("iteration 2"); |
| logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId); |
| logger.info("instance2.slingId: "+instance2.slingId); |
| logger.info("instance3.slingId: "+instance3.slingId); |
| logger.info("instance4.slingId: "+instance4.slingId); |
| instance1Restarted.dumpRepo(); |
| assertSameTopology(new SimpleClusterView(instance1Restarted, instance2)); |
| assertSameTopology( |
| new SimpleClusterView(instance3), |
| new SimpleClusterView(instance4)); |
| instance1Restarted.stop(); |
| |
| logger.info("testStaleInstanceIn3Clusters4139: end"); |
| } |
| |
| /** |
| * Preparation steps for SLING-4139 tests: |
| * Creates two clusters: A: with instance1 and 2, B with instance 3 |
| * instance 3 creates a connector to instance 1 |
| * then instance 1 is killed (crashes) |
| * @return the slingId of the original (crashed) instance1 |
| */ |
| private String prepare4139() throws Throwable, Exception, |
| InterruptedException { |
| tearDown(); // stop anything running.. |
| instance1 = newBuilder().setDebugName("firstInstance") |
| .newRepository("/var/discovery/impl/", true) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1).build(); |
| instance2 = newBuilder().setDebugName("secondInstance") |
| .useRepositoryOf(instance1) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1).build(); |
| // join the two instances to form a cluster by sending out heartbeats |
| runHeartbeatOnceWith(instance1, instance2); |
| Thread.sleep(100); |
| runHeartbeatOnceWith(instance1, instance2); |
| Thread.sleep(100); |
| runHeartbeatOnceWith(instance1, instance2); |
| assertSameClusterIds(instance1, instance2); |
| |
| // now launch the remote instance |
| instance3 = newBuilder().setDebugName("remoteInstance") |
| .newRepository("/var/discovery/implremote/", false) |
| .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */) |
| .setMinEventDelay(1).build(); |
| assertSameClusterIds(instance1, instance2); |
| try{ |
| instance3.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException ue) { |
| // ok |
| } |
| assertEquals(0, instance1.getAnnouncementRegistry().listLocalAnnouncements().size()); |
| assertEquals(0, instance1.getAnnouncementRegistry().listLocalIncomingAnnouncements().size()); |
| assertEquals(0, instance2.getAnnouncementRegistry().listLocalAnnouncements().size()); |
| assertEquals(0, instance2.getAnnouncementRegistry().listLocalIncomingAnnouncements().size()); |
| assertEquals(0, instance3.getAnnouncementRegistry().listLocalAnnouncements().size()); |
| assertEquals(0, instance3.getAnnouncementRegistry().listLocalIncomingAnnouncements().size()); |
| |
| // create a topology connector from instance3 to instance1 |
| // -> corresponds to starting to ping |
| instance3.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| Thread.sleep(1000); |
| pingConnector(instance3, instance1); |
| // make asserts on the topology |
| instance1.dumpRepo(); |
| assertSameTopology(new SimpleClusterView(instance1, instance2), new SimpleClusterView(instance3)); |
| |
| // kill instance 1 |
| logger.info("instance1.slingId="+instance1.slingId); |
| logger.info("instance2.slingId="+instance2.slingId); |
| logger.info("instance3.slingId="+instance3.slingId); |
| final String instance1SlingId = instance1.slingId; |
| instance1.stopViewChecker(); // and have instance3 no longer pinging instance1 |
| instance1.stop(); // otherwise it will have itself still registered with the observation manager and fiddle with future events.. |
| instance1 = null; // set to null to early fail if anyone still assumes (original) instance1 is up form now on |
| instance2.getConfig().setViewCheckTimeout(1); // set instance2's heartbeatTimeout to 1 sec to time out instance1 quickly! |
| instance3.getConfig().setViewCheckTimeout(1); // set instance3's heartbeatTimeout to 1 sec to time out instance1 quickly! |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance2, instance3); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance2, instance3); |
| Thread.sleep(500); |
| runHeartbeatOnceWith(instance2, instance3); |
| // instance 2 should now be alone - in fact, 3 should be alone as well |
| instance2.dumpRepo(); |
| assertTopology(instance2, new SimpleClusterView(instance2)); |
| assertTopology(instance3, new SimpleClusterView(instance3)); |
| instance2.getConfig().setViewCheckTimeout(Integer.MAX_VALUE /* no timeout */); // set instance2's heartbeatTimeout back to Integer.MAX_VALUE /* no timeout */ |
| instance3.getConfig().setViewCheckTimeout(Integer.MAX_VALUE /* no timeout */); // set instance3's heartbeatTimeout back to Integer.MAX_VALUE /* no timeout */ |
| return instance1SlingId; |
| } |
| |
| private void assertNotSameClusterIds(VirtualInstance... instances) throws UndefinedClusterViewException { |
| if (instances==null) { |
| fail("must not pass empty set of instances here"); |
| } |
| if (instances.length<=1) { |
| fail("must not pass 0 or 1 instance only"); |
| } |
| final String clusterId1 = instances[0].getClusterViewService() |
| .getLocalClusterView().getId(); |
| for(int i=1; i<instances.length; i++) { |
| final String otherClusterId = instances[i].getClusterViewService() |
| .getLocalClusterView().getId(); |
| // cluster ids must NOT be the same |
| assertNotEquals(clusterId1, otherClusterId); |
| } |
| if (instances.length>2) { |
| final VirtualInstance[] subset = new VirtualInstance[instances.length-1]; |
| System.arraycopy(instances, 0, subset, 1, instances.length-1); |
| assertNotSameClusterIds(subset); |
| } |
| } |
| |
| private void assertSameClusterIds(VirtualInstance... instances) throws UndefinedClusterViewException { |
| if (instances==null) { |
| // then there is nothing to compare |
| return; |
| } |
| if (instances.length==1) { |
| // then there is nothing to compare |
| return; |
| } |
| final String clusterId1 = instances[0].getClusterViewService() |
| .getLocalClusterView().getId(); |
| for(int i=1; i<instances.length; i++) { |
| final String otherClusterId = instances[i].getClusterViewService() |
| .getLocalClusterView().getId(); |
| // cluster ids must be the same |
| if (!clusterId1.equals(otherClusterId)) { |
| logger.error("assertSameClusterIds: instances[0]: "+instances[0]); |
| logger.error("assertSameClusterIds: instances["+i+"]: "+instances[i]); |
| fail("mismatch in clusterIds: expected to equal: clusterId1="+clusterId1+", otherClusterId="+otherClusterId); |
| } |
| } |
| } |
| |
| private void assertTopology(VirtualInstance instance, SimpleClusterView... assertedClusterViews) { |
| final TopologyView topology = instance.getDiscoveryService().getTopology(); |
| logger.info("assertTopology: instance "+instance.slingId+" sees topology: "+topology+", expected: "+assertedClusterViews); |
| assertNotNull(topology); |
| if (assertedClusterViews.length!=topology.getClusterViews().size()) { |
| dumpFailureDetails(topology, assertedClusterViews); |
| fail("instance "+instance.slingId+ " expected "+assertedClusterViews.length+", got: "+topology.getClusterViews().size()); |
| } |
| final Set<ClusterView> actualClusters = new HashSet<ClusterView>(topology.getClusterViews()); |
| for(int i=0; i<assertedClusterViews.length; i++) { |
| final SimpleClusterView assertedClusterView = assertedClusterViews[i]; |
| boolean foundMatch = false; |
| for (Iterator<ClusterView> it = actualClusters.iterator(); it |
| .hasNext();) { |
| final ClusterView actualClusterView = it.next(); |
| if (matches(assertedClusterView, actualClusterView)) { |
| it.remove(); |
| foundMatch = true; |
| break; |
| } |
| } |
| if (!foundMatch) { |
| dumpFailureDetails(topology, assertedClusterViews); |
| fail("instance "+instance.slingId+ " could not find a match in the topology with instance="+instance.slingId+" and clusterViews="+assertedClusterViews.length); |
| } |
| } |
| assertEquals("not all asserted clusterviews are in the actual view with instance="+instance+" and clusterViews="+assertedClusterViews, actualClusters.size(), 0); |
| } |
| |
| private void dumpFailureDetails(TopologyView topology, SimpleClusterView... assertedClusterViews) { |
| logger.error("assertTopology: expected: "+assertedClusterViews.length); |
| for(int j=0; j<assertedClusterViews.length; j++) { |
| logger.error("assertTopology: ["+j+"]: "+assertedClusterViews[j].toString()); |
| } |
| final Set<ClusterView> clusterViews = topology.getClusterViews(); |
| final Set<InstanceDescription> instances = topology.getInstances(); |
| logger.error("assertTopology: actual: "+clusterViews.size()+" clusters with a total of "+instances.size()+" instances"); |
| for (Iterator<ClusterView> it = clusterViews.iterator(); it.hasNext();) { |
| final ClusterView aCluster = it.next(); |
| logger.error("assertTopology: a cluster: "+aCluster.getId()); |
| for (Iterator<InstanceDescription> it2 = aCluster.getInstances().iterator(); it2.hasNext();) { |
| final InstanceDescription id = it2.next(); |
| logger.error("assertTopology: - an instance "+id.getSlingId()); |
| } |
| } |
| logger.error("assertTopology: list of all instances: "+instances.size()); |
| for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) { |
| final InstanceDescription id = it.next(); |
| logger.error("assertTopology: - an instance: "+id.getSlingId()); |
| } |
| } |
| |
| private boolean matches(SimpleClusterView assertedClusterView, |
| ClusterView actualClusterView) { |
| assertNotNull(assertedClusterView); |
| assertNotNull(actualClusterView); |
| if (assertedClusterView.instances.length!=actualClusterView.getInstances().size()) { |
| return false; |
| } |
| final Set<InstanceDescription> actualInstances = new HashSet<InstanceDescription>(actualClusterView.getInstances()); |
| outerLoop:for(int i=0; i<assertedClusterView.instances.length; i++) { |
| final VirtualInstance assertedInstance = assertedClusterView.instances[i]; |
| for (Iterator<InstanceDescription> it = actualInstances.iterator(); it |
| .hasNext();) { |
| final InstanceDescription anActualInstance = it.next(); |
| if (assertedInstance.slingId.equals(anActualInstance.getSlingId())) { |
| continue outerLoop; |
| } |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| private boolean pingConnector(final VirtualInstance from, final VirtualInstance to) throws UndefinedClusterViewException { |
| final Announcement fromAnnouncement = createFromAnnouncement(from); |
| Announcement replyAnnouncement = null; |
| try{ |
| replyAnnouncement = ping(to, fromAnnouncement); |
| } catch(AssertionError e) { |
| logger.warn("pingConnector: ping failed, assertionError: "+e); |
| return false; |
| } catch (UndefinedClusterViewException e) { |
| logger.warn("pingConnector: ping failed, currently the cluster view is undefined: "+e); |
| return false; |
| } |
| registerReplyAnnouncement(from, replyAnnouncement); |
| return true; |
| } |
| |
| private void registerReplyAnnouncement(VirtualInstance from, |
| Announcement inheritedAnnouncement) { |
| final AnnouncementRegistry announcementRegistry = from.getAnnouncementRegistry(); |
| if (inheritedAnnouncement.isLoop()) { |
| fail("loop detected"); |
| // we dont currently support loops here in the junit tests |
| return; |
| } else { |
| inheritedAnnouncement.setInherited(true); |
| if (announcementRegistry |
| .registerAnnouncement(inheritedAnnouncement)==-1) { |
| logger.info("ping: connector response is from an instance which I already see in my topology" |
| + inheritedAnnouncement); |
| return; |
| } |
| } |
| // resultingAnnouncement = inheritedAnnouncement; |
| // statusDetails = null; |
| } |
| |
| private Announcement ping(VirtualInstance to, final Announcement incomingTopologyAnnouncement) |
| throws UndefinedClusterViewException { |
| final String slingId = to.slingId; |
| final ClusterViewService clusterViewService = to.getClusterViewService(); |
| final AnnouncementRegistry announcementRegistry = to.getAnnouncementRegistry(); |
| |
| incomingTopologyAnnouncement.removeInherited(slingId); |
| |
| final Announcement replyAnnouncement = new Announcement( |
| slingId); |
| |
| long backoffInterval = -1; |
| final ClusterView clusterView = clusterViewService.getLocalClusterView(); |
| if (!incomingTopologyAnnouncement.isCorrectVersion()) { |
| fail("incorrect version"); |
| return null; // never reached |
| } else if (ClusterViewHelper.contains(clusterView, incomingTopologyAnnouncement |
| .getOwnerId())) { |
| fail("loop=true"); |
| return null; // never reached |
| } else if (ClusterViewHelper.containsAny(clusterView, incomingTopologyAnnouncement |
| .listInstances())) { |
| fail("incoming announcement contains instances that are part of my cluster"); |
| return null; // never reached |
| } else { |
| backoffInterval = announcementRegistry |
| .registerAnnouncement(incomingTopologyAnnouncement); |
| if (backoffInterval==-1) { |
| fail("rejecting an announcement from an instance that I already see in my topology: "); |
| return null; // never reached |
| } else { |
| // normal, successful case: replying with the part of the topology which this instance sees |
| replyAnnouncement.setLocalCluster(clusterView); |
| announcementRegistry.addAllExcept(replyAnnouncement, clusterView, |
| new AnnouncementFilter() { |
| |
| public boolean accept(final String receivingSlingId, Announcement announcement) { |
| if (announcement.getPrimaryKey().equals( |
| incomingTopologyAnnouncement |
| .getPrimaryKey())) { |
| return false; |
| } |
| return true; |
| } |
| }); |
| return replyAnnouncement; |
| } |
| } |
| } |
| |
| private Announcement createFromAnnouncement(final VirtualInstance from) throws UndefinedClusterViewException { |
| // TODO: refactor TopologyConnectorClient to avoid duplicating code from there (ping()) |
| Announcement topologyAnnouncement = new Announcement(from.slingId); |
| topologyAnnouncement.setServerInfo(from.slingId); |
| final ClusterView clusterView = from.getClusterViewService().getLocalClusterView(); |
| topologyAnnouncement.setLocalCluster(clusterView); |
| from.getAnnouncementRegistry().addAllExcept(topologyAnnouncement, clusterView, new AnnouncementFilter() { |
| |
| public boolean accept(final String receivingSlingId, final Announcement announcement) { |
| // filter out announcements that are of old cluster instances |
| // which I dont really have in my cluster view at the moment |
| final Iterator<InstanceDescription> it = |
| clusterView.getInstances().iterator(); |
| while(it.hasNext()) { |
| final InstanceDescription instance = it.next(); |
| if (instance.getSlingId().equals(receivingSlingId)) { |
| // then I have the receiving instance in my cluster view |
| // all fine then |
| return true; |
| } |
| } |
| // looks like I dont have the receiving instance in my cluster view |
| // then I should also not propagate that announcement anywhere |
| return false; |
| } |
| }); |
| return topologyAnnouncement; |
| } |
| |
| @Test |
| public void testStableClusterId() throws Throwable { |
| logger.info("testStableClusterId: start"); |
| // stop 1 and 2 and create them with a lower heartbeat timeout |
| instance2.stopViewChecker(); |
| instance1.stopViewChecker(); |
| instance2.stop(); |
| instance1.stop(); |
| // SLING-4302 : first set the heartbeatTimeout to 100 sec - large enough to work on all CI instances |
| instance1 = newBuilder().setDebugName("firstInstance") |
| .newRepository("/var/discovery/impl/", true) |
| .setConnectorPingTimeout(100) |
| .setMinEventDelay(1).build(); |
| instance2 = newBuilder().setDebugName("secondInstance") |
| .useRepositoryOf(instance1) |
| .setConnectorPingTimeout(100) |
| .setMinEventDelay(1).build(); |
| assertNotNull(instance1); |
| assertNotNull(instance2); |
| |
| try{ |
| instance1.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| try{ |
| instance2.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| // let the sync/voting happen |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| |
| String newClusterId1 = instance1.getClusterViewService() |
| .getLocalClusterView().getId(); |
| String newClusterId2 = instance2.getClusterViewService() |
| .getLocalClusterView().getId(); |
| // both cluster ids must be the same |
| assertEquals(newClusterId1, newClusterId1); |
| |
| instance1.dumpRepo(); |
| assertEquals(2, instance1.getClusterViewService().getLocalClusterView().getInstances().size()); |
| assertEquals(2, instance2.getClusterViewService().getLocalClusterView().getInstances().size()); |
| |
| // let instance2 'die' by now longer doing heartbeats |
| // SLING-4302 : then set the heartbeatTimeouts back to 1 sec to have them properly time out with the sleeps applied below |
| instance2.getConfig().setViewCheckTimeout(1); |
| instance1.getConfig().setViewCheckTimeout(1); |
| instance2.stopViewChecker(); // would actually not be necessary as it was never started.. this test only runs heartbeats manually |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| // the cluster should now have size 1 |
| assertEquals(1, instance1.getClusterViewService().getLocalClusterView().getInstances().size()); |
| // the instance 2 should be in isolated mode as it is no longer in the established view |
| // hence null |
| try{ |
| instance2.getViewChecker().checkView(); |
| instance2.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| // but the cluster id must have remained stable |
| instance1.dumpRepo(); |
| String actualClusterId = instance1.getClusterViewService() |
| .getLocalClusterView().getId(); |
| logger.info("expected cluster id: "+newClusterId1); |
| logger.info("actual cluster id: "+actualClusterId); |
| assertEquals(newClusterId1, actualClusterId); |
| logger.info("testStableClusterId: end"); |
| } |
| |
| @Test |
| public void testClusterView() throws Exception { |
| logger.info("testClusterView: start"); |
| assertNotNull(instance1); |
| assertNotNull(instance2); |
| assertNull(instance3); |
| instance3 = newBuilder().setDebugName("thirdInstance") |
| .useRepositoryOf(instance1) |
| .build(); |
| assertNotNull(instance3); |
| |
| assertEquals(instance1.getSlingId(), instance1.getClusterViewService() |
| .getSlingId()); |
| assertEquals(instance2.getSlingId(), instance2.getClusterViewService() |
| .getSlingId()); |
| assertEquals(instance3.getSlingId(), instance3.getClusterViewService() |
| .getSlingId()); |
| |
| try{ |
| instance1.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| try{ |
| instance2.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| try{ |
| instance3.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| instance1.dumpRepo(); |
| |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| |
| instance1.dumpRepo(); |
| logger.info("testClusterView: 1st 2s sleep"); |
| Thread.sleep(2000); |
| |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| logger.info("testClusterView: 2nd 2s sleep"); |
| Thread.sleep(2000); |
| |
| instance1.dumpRepo(); |
| String clusterId1 = instance1.getClusterViewService().getLocalClusterView() |
| .getId(); |
| logger.info("clusterId1=" + clusterId1); |
| String clusterId2 = instance2.getClusterViewService().getLocalClusterView() |
| .getId(); |
| logger.info("clusterId2=" + clusterId2); |
| String clusterId3 = instance3.getClusterViewService().getLocalClusterView() |
| .getId(); |
| logger.info("clusterId3=" + clusterId3); |
| assertEquals(clusterId1, clusterId2); |
| assertEquals(clusterId1, clusterId3); |
| |
| assertEquals(3, instance1.getClusterViewService().getLocalClusterView() |
| .getInstances().size()); |
| assertEquals(3, instance2.getClusterViewService().getLocalClusterView() |
| .getInstances().size()); |
| assertEquals(3, instance3.getClusterViewService().getLocalClusterView() |
| .getInstances().size()); |
| logger.info("testClusterView: end"); |
| } |
| |
| @Test |
| public void testAdditionalInstance() throws Throwable { |
| logger.info("testAdditionalInstance: start"); |
| assertNotNull(instance1); |
| assertNotNull(instance2); |
| |
| assertEquals(instance1.getSlingId(), instance1.getClusterViewService() |
| .getSlingId()); |
| assertEquals(instance2.getSlingId(), instance2.getClusterViewService() |
| .getSlingId()); |
| |
| try{ |
| instance1.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| try{ |
| instance2.getClusterViewService().getLocalClusterView(); |
| fail("should complain"); |
| } catch(UndefinedClusterViewException e) { |
| // ok |
| } |
| |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| |
| instance1.dumpRepo(); |
| logger.info("testAdditionalInstance: 1st 2s sleep"); |
| Thread.sleep(2000); |
| |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| logger.info("testAdditionalInstance: 2nd 2s sleep"); |
| Thread.sleep(2000); |
| |
| instance1.dumpRepo(); |
| String clusterId1 = instance1.getClusterViewService().getLocalClusterView() |
| .getId(); |
| logger.info("clusterId1=" + clusterId1); |
| String clusterId2 = instance2.getClusterViewService().getLocalClusterView() |
| .getId(); |
| logger.info("clusterId2=" + clusterId2); |
| assertEquals(clusterId1, clusterId2); |
| |
| assertEquals(2, instance1.getClusterViewService().getLocalClusterView() |
| .getInstances().size()); |
| assertEquals(2, instance2.getClusterViewService().getLocalClusterView() |
| .getInstances().size()); |
| |
| AssertingTopologyEventListener assertingTopologyEventListener = new AssertingTopologyEventListener(); |
| assertingTopologyEventListener.addExpected(Type.TOPOLOGY_INIT); |
| assertEquals(1, assertingTopologyEventListener.getRemainingExpectedCount()); |
| instance1.bindTopologyEventListener(assertingTopologyEventListener); |
| Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays |
| assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount()); |
| |
| // startup instance 3 |
| AcceptsMultiple acceptsMultiple = new AcceptsMultiple( |
| Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED); |
| assertingTopologyEventListener.addExpected(acceptsMultiple); |
| assertingTopologyEventListener.addExpected(acceptsMultiple); |
| instance3 = newBuilder().setDebugName("thirdInstance") |
| .useRepositoryOf(instance1) |
| .build(); |
| for(int i=0; i<4; i++) { |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| logger.info("testAdditionalInstance: i="+i+", 2s sleep"); |
| Thread.sleep(2000); |
| } |
| |
| assertEquals(1, acceptsMultiple.getEventCnt(Type.TOPOLOGY_CHANGING)); |
| assertEquals(1, acceptsMultiple.getEventCnt(Type.TOPOLOGY_CHANGED)); |
| logger.info("testAdditionalInstance: end"); |
| } |
| |
| @Test |
| public void testPropertyProviders() throws Throwable { |
| logger.info("testPropertyProviders: start"); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| assertNull(instance3); |
| instance3 = newBuilder().setDebugName("thirdInstance") |
| .useRepositoryOf(instance1) |
| .build(); |
| instance3.heartbeatsAndCheckView(); |
| logger.info("testPropertyProviders: 1st 2s sleep"); |
| Thread.sleep(2000); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| logger.info("testPropertyProviders: 2nd 2s sleep"); |
| Thread.sleep(2000); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| instance3.heartbeatsAndCheckView(); |
| logger.info("testPropertyProviders: 3rd 2s sleep"); |
| Thread.sleep(2000); |
| |
| property1Value = UUID.randomUUID().toString(); |
| property1Name = UUID.randomUUID().toString(); |
| PropertyProviderImpl pp1 = new PropertyProviderImpl(); |
| pp1.setProperty(property1Name, property1Value); |
| instance1.bindPropertyProvider(pp1, property1Name); |
| |
| property2Value = UUID.randomUUID().toString(); |
| property2Name = UUID.randomUUID().toString(); |
| PropertyProviderImpl pp2 = new PropertyProviderImpl(); |
| pp2.setProperty(property2Name, property2Value); |
| instance2.bindPropertyProvider(pp2, property2Name); |
| |
| assertPropertyValues(); |
| |
| property1Value = UUID.randomUUID().toString(); |
| pp1.setProperty(property1Name, property1Value); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| |
| assertPropertyValues(); |
| assertNull(instance1.getClusterViewService().getLocalClusterView() |
| .getInstances().get(0) |
| .getProperty(UUID.randomUUID().toString())); |
| assertNull(instance2.getClusterViewService().getLocalClusterView() |
| .getInstances().get(0) |
| .getProperty(UUID.randomUUID().toString())); |
| logger.info("testPropertyProviders: end"); |
| } |
| |
| private void assertPropertyValues() throws UndefinedClusterViewException { |
| assertPropertyValues(instance1.getSlingId(), property1Name, |
| property1Value); |
| assertPropertyValues(instance2.getSlingId(), property2Name, |
| property2Value); |
| } |
| |
| private void assertPropertyValues(String slingId, String name, String value) throws UndefinedClusterViewException { |
| assertEquals(value, getInstance(instance1, slingId).getProperty(name)); |
| assertEquals(value, getInstance(instance2, slingId).getProperty(name)); |
| } |
| |
| private InstanceDescription getInstance(VirtualInstance instance, String slingId) throws UndefinedClusterViewException { |
| Iterator<InstanceDescription> it = instance.getClusterViewService() |
| .getLocalClusterView().getInstances().iterator(); |
| while (it.hasNext()) { |
| InstanceDescription id = it.next(); |
| if (id.getSlingId().equals(slingId)) { |
| return id; |
| } |
| } |
| throw new IllegalStateException("instance not found: instance=" |
| + instance + ", slingId=" + slingId); |
| } |
| |
| class LongRunningListener implements TopologyEventListener { |
| |
| String failMsg = null; |
| |
| boolean initReceived = false; |
| int noninitReceived; |
| |
| private Semaphore changedSemaphore = new Semaphore(0); |
| |
| public void assertNoFail() { |
| if (failMsg!=null) { |
| fail(failMsg); |
| } |
| } |
| |
| public Semaphore getChangedSemaphore() { |
| return changedSemaphore; |
| } |
| |
| public void handleTopologyEvent(TopologyEvent event) { |
| if (failMsg!=null) { |
| failMsg += "/ Already failed, got another event; "+event; |
| return; |
| } |
| if (!initReceived) { |
| if (event.getType()!=Type.TOPOLOGY_INIT) { |
| failMsg = "Expected TOPOLOGY_INIT first, got: "+event.getType(); |
| return; |
| } |
| initReceived = true; |
| return; |
| } |
| if (event.getType()==Type.TOPOLOGY_CHANGED) { |
| try { |
| changedSemaphore.acquire(); |
| } catch (InterruptedException e) { |
| throw new Error("don't interrupt me pls: "+e); |
| } |
| } |
| noninitReceived++; |
| } |
| } |
| |
| /** |
| * Test plan: |
| * * have a discoveryservice with two listeners registered |
| * * one of them (the 'first' one) is long running |
| * * during one of the topology changes, when the first |
| * one is hit, deactivate the discovery service |
| * * that deactivation used to block (SLING-4755) due |
| * to synchronized(lock) which was blocked by the |
| * long running listener. With having asynchronous |
| * event sending this should no longer be the case |
| * * also, once asserted that deactivation finished, |
| * and that the first listener is still busy, make |
| * sure that once the first listener finishes, that |
| * the second listener still gets the event |
| * @throws Throwable |
| */ |
| @Test |
| public void testLongRunningListener() throws Throwable { |
| // let the instance1 become alone, instance2 is idle |
| instance1.getConfig().setViewCheckTimeout(2); |
| instance2.getConfig().setViewCheckTimeout(2); |
| logger.info("testLongRunningListener : letting instance2 remain silent from now on"); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(1500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(1500); |
| instance1.heartbeatsAndCheckView(); |
| Thread.sleep(1500); |
| instance1.heartbeatsAndCheckView(); |
| logger.info("testLongRunningListener : instance 2 should now be considered dead"); |
| // instance1.dumpRepo(); |
| |
| LongRunningListener longRunningListener1 = new LongRunningListener(); |
| AssertingTopologyEventListener fastListener2 = new AssertingTopologyEventListener(); |
| fastListener2.addExpected(Type.TOPOLOGY_INIT); |
| longRunningListener1.assertNoFail(); |
| assertEquals(1, fastListener2.getRemainingExpectedCount()); |
| logger.info("testLongRunningListener : binding longRunningListener1 ..."); |
| instance1.bindTopologyEventListener(longRunningListener1); |
| logger.info("testLongRunningListener : binding fastListener2 ..."); |
| instance1.bindTopologyEventListener(fastListener2); |
| logger.info("testLongRunningListener : waiting a bit for longRunningListener1 to receive the TOPOLOGY_INIT event"); |
| Thread.sleep(2500); // SLING-4755: async event sending requires some minimal wait time nowadays |
| assertEquals(0, fastListener2.getRemainingExpectedCount()); |
| assertTrue(longRunningListener1.initReceived); |
| |
| // after INIT, now do an actual change where listener1 will do a long-running handling |
| fastListener2.addExpected(Type.TOPOLOGY_CHANGING); |
| fastListener2.addExpected(Type.TOPOLOGY_CHANGED); |
| instance1.getConfig().setViewCheckTimeout(10); |
| instance2.getConfig().setViewCheckTimeout(10); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| instance1.heartbeatsAndCheckView(); |
| instance2.heartbeatsAndCheckView(); |
| Thread.sleep(500); |
| |
| instance1.dumpRepo(); |
| longRunningListener1.assertNoFail(); |
| // nothing unexpected should arrive at listener2: |
| assertEquals(0, fastListener2.getUnexpectedCount()); |
| // however, listener2 should only get one (CHANGING) event, cos the CHANGED event is still blocked |
| assertEquals(1, fastListener2.getRemainingExpectedCount()); |
| // and also listener2 should only get CHANGING, the CHANGED is blocked via changedSemaphore |
| assertEquals(1, longRunningListener1.noninitReceived); |
| Thread.sleep(2000); |
| assertTrue(longRunningListener1.getChangedSemaphore().hasQueuedThreads()); |
| Thread.sleep(2000); |
| // even after a 2sec sleep things should be unchanged: |
| assertEquals(0, fastListener2.getUnexpectedCount()); |
| assertEquals(1, fastListener2.getRemainingExpectedCount()); |
| assertEquals(1, longRunningListener1.noninitReceived); |
| assertTrue(longRunningListener1.getChangedSemaphore().hasQueuedThreads()); |
| |
| // now let's simulate SLING-4755: deactivation while longRunningListener1 does long processing |
| // - which is simulated by waiting on changedSemaphore. |
| final List<Exception> asyncException = new LinkedList<Exception>(); |
| Thread th = new Thread(new Runnable() { |
| |
| public void run() { |
| try { |
| instance1.stop(); |
| } catch (Exception e) { |
| synchronized(asyncException) { |
| asyncException.add(e); |
| } |
| } |
| } |
| |
| }); |
| th.start(); |
| logger.info("Waiting max 4 sec..."); |
| th.join(4000); |
| logger.info("Done waiting max 4 sec..."); |
| if (th.isAlive()) { |
| logger.warn("Thread still alive: "+th.isAlive()); |
| // release before issuing fail as otherwise test will block forever |
| longRunningListener1.getChangedSemaphore().release(); |
| fail("Thread was still alive"); |
| } |
| logger.info("Thread was no longer alive: "+th.isAlive()); |
| synchronized(asyncException) { |
| logger.info("Async exceptions: "+asyncException.size()); |
| if (asyncException.size()!=0) { |
| // release before issuing fail as otherwise test will block forever |
| longRunningListener1.getChangedSemaphore().release(); |
| fail("async exceptions: "+asyncException.size()+", first: "+asyncException.get(0)); |
| } |
| } |
| |
| // now the test consists of |
| // a) the fact that we reached this place without unlocking the changedSemaphore |
| // b) when we now unlock the changedSemaphore the remaining events should flush through |
| longRunningListener1.getChangedSemaphore().release(); |
| Thread.sleep(500);// shouldn't take long and then things should have flushed: |
| assertEquals(0, fastListener2.getUnexpectedCount()); |
| assertEquals(0, fastListener2.getRemainingExpectedCount()); |
| assertEquals(2, longRunningListener1.noninitReceived); |
| assertFalse(longRunningListener1.getChangedSemaphore().hasQueuedThreads()); |
| } |
| |
| |
| } |