/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.discovery.base.its;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;

import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.RootLogger;
import org.apache.sling.commons.testing.junit.categories.Slow;
import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
import org.apache.sling.discovery.base.commons.ClusterViewHelper;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
import org.apache.sling.discovery.base.connectors.announcement.Announcement;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementFilter;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.its.setup.VirtualInstance;
import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
import org.apache.sling.discovery.base.its.setup.mock.AcceptsMultiple;
import org.apache.sling.discovery.base.its.setup.mock.AssertingTopologyEventListener;
import org.apache.sling.discovery.base.its.setup.mock.PropertyProviderImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractClusterTest {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private class SimpleClusterView {

    	private VirtualInstance[] instances;

    	SimpleClusterView(VirtualInstance... instances) {
    		this.instances = instances;
    	}

    	@Override
    	public String toString() {
    	    String instanceSlingIds = "";
    	    for(int i=0; i<instances.length; i++) {
    	        instanceSlingIds = instanceSlingIds + instances[i].slingId + ",";
    	    }
            return "an expected cluster with "+instances.length+" instances: "+instanceSlingIds;
    	}
    }

    VirtualInstance instance1;
    VirtualInstance instance2;
    VirtualInstance instance3;

    private String property1Value;

    protected String property2Value;

    private String property1Name;

    private String property2Name;
    VirtualInstance instance4;
    VirtualInstance instance5;
    VirtualInstance instance1Restarted;
    private Level logLevel;

    protected abstract VirtualInstanceBuilder newBuilder();

    @Before
    public void setup() throws Exception {
        final org.apache.log4j.Logger discoveryLogger = RootLogger.getLogger("org.apache.sling.discovery");
        logLevel = discoveryLogger.getLevel();
        discoveryLogger.setLevel(Level.TRACE);
        logger.debug("here we are");
        instance1 = newBuilder().setDebugName("firstInstance").newRepository("/var/discovery/impl/", true).build();
        instance2 = newBuilder().setDebugName("secondInstance").useRepositoryOf(instance1).build();
    }

    @After
    public void tearDown() throws Exception {
        if (instance5 != null) {
            instance5.stop();
        }
        if (instance4 != null) {
            instance4.stop();
        }
        if (instance3 != null) {
            instance3.stop();
        }
        if (instance3 != null) {
            instance3.stop();
        }
        if (instance2 != null) {
        	instance2.stop();
        }
        if (instance1 != null) {
            instance1.stop();
        }
        if (instance1Restarted != null) {
            instance1Restarted.stop();
        }
        instance1Restarted = null;
        instance1 = null;
        instance2 = null;
        instance3 = null;
        instance4 = null;
        instance5 = null;
        final org.apache.log4j.Logger discoveryLogger = RootLogger.getLogger("org.apache.sling.discovery");
        discoveryLogger.setLevel(logLevel);
    }

    /** test leader behaviour with ascending slingIds, SLING-3253 **/
    @Test
    public void testLeaderAsc() throws Throwable {
        logger.info("testLeaderAsc: start");
    	doTestLeader("000", "111");
        logger.info("testLeaderAsc: end");
    }

    /** test leader behaviour with descending slingIds, SLING-3253 **/
    @Test
    public void testLeaderDesc() throws Throwable {
        logger.info("testLeaderDesc: start");
    	doTestLeader("111", "000");
        logger.info("testLeaderDesc: end");
    }

    private void doTestLeader(String slingId1, String slingId2) throws Throwable {
        logger.info("doTestLeader("+slingId1+","+slingId2+"): start");
    	// stop 1 and 2 and create them with a lower heartbeat timeout
    	instance2.stopViewChecker();
    	instance1.stopViewChecker();
        instance2.stop();
        instance1.stop();
        instance1 = newBuilder().setDebugName("firstInstance")
                .newRepository("/var/discovery/impl/", true)
                .setConnectorPingTimeout(30)
                .setMinEventDelay(1)
                .setSlingId(slingId1).build();
        // sleep so that the two dont have the same startup time, and thus leaderElectionId is lower for instance1
        logger.info("doTestLeader: 1st sleep 200ms");
        Thread.sleep(200);
        instance2 = newBuilder().setDebugName("secondInstance")
                .useRepositoryOf(instance1)
                .setConnectorPingTimeout(30)
                .setMinEventDelay(1)
                .setSlingId(slingId2).build();
        assertNotNull(instance1);
        assertNotNull(instance2);

        // the two instances are still isolated - hence they throw an exception
        try{
            instance1.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }
        try{
            instance2.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        // let the sync/voting happen
        for(int m=0; m<4; m++) {
            instance1.heartbeatsAndCheckView();
            instance2.heartbeatsAndCheckView();
            logger.info("doTestLeader: sleep 500ms");
            Thread.sleep(500);
        }
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();

        // now they must be in the same cluster, so in a cluster of size 1
        assertEquals(2, instance1.getClusterViewService().getLocalClusterView().getInstances().size());
        assertEquals(2, instance2.getClusterViewService().getLocalClusterView().getInstances().size());

        // the first instance should be the leader - since it was started first
        assertTrue(instance1.getLocalInstanceDescription().isLeader());
        assertFalse(instance2.getLocalInstanceDescription().isLeader());
        logger.info("doTestLeader("+slingId1+","+slingId2+"): end");
    }

    /**
     * Tests stale announcement reported in SLING-4139:
     * An instance which crashes but had announcements, never cleans up those announcements.
     * Thus on a restart, those announcements are still there, even if the connector
     * would no longer be in use (or point somewhere else etc).
     * That has various effects, one of them tested in this method: peers in the same cluster,
     * after the crashed/stopped instance restarts, will assume those stale announcements
     * as being correct and include them in the topology - hence reporting stale instances
     * (which can be old instances or even duplicates).
     */
    @Test
    public void testStaleAnnouncementsVisibleToClusterPeers4139() throws Throwable {
        logger.info("testStaleAnnouncementsVisibleToClusterPeers4139: start");
    	final String instance1SlingId = prepare4139();

        // remove topology connector from instance3 to instance1
        // -> corresponds to stop pinging
        // (nothing to assert additionally here)

        // start instance 1
        instance1Restarted = newBuilder().setDebugName("firstInstance")
                .useRepositoryOf(instance2)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1)
                .setSlingId(instance1SlingId).build();
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);

        // facts: connector 3->1 does not exist actively anymore,
        //        instance 1+2 should build a cluster,
        //        instance 3 should be isolated
        logger.info("instance1Restarted.dump: "+instance1Restarted.slingId);
        instance1Restarted.dumpRepo();

        logger.info("instance2.dump: "+instance2.slingId);
        instance2.dumpRepo();

        logger.info("instance3.dump: "+instance3.slingId);
        instance3.dumpRepo();

        assertTopology(instance1Restarted, new SimpleClusterView(instance1Restarted, instance2));
        assertTopology(instance3, new SimpleClusterView(instance3));
        assertTopology(instance2, new SimpleClusterView(instance1Restarted, instance2));
        instance1Restarted.stop();
        logger.info("testStaleAnnouncementsVisibleToClusterPeers4139: end");
    }

    /**
     * Tests a situation where a connector was done to instance1, which eventually
     * crashed, then the connector is done to instance2. Meanwhile instance1
     * got restarted and this test assures that the instance3 is not reported
     * twice in the topology. Did not happen before 4139, but should never afterwards neither
     */
    @Test
    public void testDuplicateInstanceIn2Clusters4139() throws Throwable {
        logger.info("testDuplicateInstanceIn2Clusters4139: start");
        final String instance1SlingId = prepare4139();

        // remove topology connector from instance3 to instance1
        // -> corresponds to stop pinging
        // (nothing to assert additionally here)
        // instead, now start a connector from instance3 to instance2
        pingConnector(instance3, instance2);

        // start instance 1
        instance1Restarted = newBuilder().setDebugName("firstInstance")
                .useRepositoryOf(instance2)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1)
                .setSlingId(instance1SlingId).build();
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        pingConnector(instance3, instance2);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        pingConnector(instance3, instance2);
        logger.info("iteration 0");
        logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId);
        logger.info("instance2.slingId: "+instance2.slingId);
        logger.info("instance3.slingId: "+instance3.slingId);
        instance1Restarted.dumpRepo();
        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), new SimpleClusterView(instance3));

        Thread.sleep(500);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        pingConnector(instance3, instance2);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3);
        pingConnector(instance3, instance2);
        logger.info("iteration 1");
        logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId);
        logger.info("instance2.slingId: "+instance2.slingId);
        logger.info("instance3.slingId: "+instance3.slingId);
        instance1Restarted.dumpRepo();
        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2), new SimpleClusterView(instance3));
        instance1Restarted.stop();

        logger.info("testDuplicateInstanceIn2Clusters4139: end");
    }

/*    ok, this test should do the following:
         * cluster A with instance 1 and instance 2
         * cluster B with instance 3 and instance 4
         * cluster C with instance 5

         * initially, instance3 is pinging instance1, and instance 5 is pinging instance1 as well (MAC hub)
          * that should result in instance3 and 5 to inherit the rest from instance1
         * then simulate load balancer switching from instance1 to instance2 - hence pings go to instance2
         *
         */
    @Category(Slow.class) //TODO: this takes env 45sec
    @Test
    public void testConnectorSwitching4139() throws Throwable {
        final int MIN_EVENT_DELAY = 1;

        tearDown(); // reset any setup that was done - we start with a different setup than the default one
        final org.apache.log4j.Logger discoveryLogger = RootLogger.getLogger("org.apache.sling.discovery");
        logLevel = discoveryLogger.getLevel();
        discoveryLogger.setLevel(Level.DEBUG);

        instance1 = newBuilder().setDebugName("instance1")
                .newRepository("/var/discovery/clusterA/", true)
                .setConnectorPingTimeout(10 /* sec */)
                .setConnectorPingInterval(999)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        instance2 = newBuilder().setDebugName("instance2")
                .useRepositoryOf(instance1)
                .setConnectorPingTimeout(10 /* sec */)
                .setConnectorPingInterval(999)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        // now launch the remote instance
        instance3 = newBuilder().setDebugName("instance3")
                .newRepository("/var/discovery/clusterB/", false)
                .setConnectorPingTimeout(10 /* sec */)
                .setConnectorPingInterval(999)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        instance4 = newBuilder().setDebugName("instance4")
                .useRepositoryOf(instance3)
                .setConnectorPingTimeout(10 /* sec */)
                .setConnectorPingInterval(999)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        instance5 = newBuilder().setDebugName("instance5")
                .newRepository("/var/discovery/clusterC/", false)
                .setConnectorPingTimeout(10 /* sec */)
                .setConnectorPingInterval(999)
                .setMinEventDelay(MIN_EVENT_DELAY).build();

        // join the instances to form a cluster by sending out heartbeats
        runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5);
        Thread.sleep(500);

        assertSameTopology(new SimpleClusterView(instance1, instance2));
        assertSameTopology(new SimpleClusterView(instance3, instance4));
        assertSameTopology(new SimpleClusterView(instance5));

        // create a topology connector from instance3 to instance1
        // -> corresponds to starting to ping
        runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5);
        pingConnector(instance3, instance1);
        pingConnector(instance5, instance1);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1, instance2, instance3, instance4, instance5);
        pingConnector(instance3, instance1);
        pingConnector(instance5, instance1);
        Thread.sleep(500);

        // make asserts on the topology
        logger.info("testConnectorSwitching4139: instance1.slingId="+instance1.slingId);
        logger.info("testConnectorSwitching4139: instance2.slingId="+instance2.slingId);
        logger.info("testConnectorSwitching4139: instance3.slingId="+instance3.slingId);
        logger.info("testConnectorSwitching4139: instance4.slingId="+instance4.slingId);
        logger.info("testConnectorSwitching4139: instance5.slingId="+instance5.slingId);
        instance1.dumpRepo();

        assertSameTopology(new SimpleClusterView(instance1, instance2),
                new SimpleClusterView(instance3, instance4),
                new SimpleClusterView(instance5));

        // simulate a crash of instance1, resulting in load-balancer to switch the pings
        boolean success = false;
        for(int i=0; i<25; i++) {
            // loop for max 25 times, min 20 times
            runHeartbeatOnceWith(instance2, instance3, instance4, instance5);
            final boolean ping1 = pingConnector(instance3, instance2);
            final boolean ping2 = pingConnector(instance5, instance2);
            if (ping1 && ping2) {
                // both pings were fine - hence break
                success = true;
                logger.info("testConnectorSwitching4139: successfully switched all pings to instance2 after "+i+" rounds.");
                if (i<20) {
                    logger.info("testConnectorSwitching4139: min loop cnt not yet reached: i="+i);
                    Thread.sleep(1000); // 20x1000ms = 20sec max - (vs 10sec timeout) - should be enough for timing out
                    continue;
                }
                break;
            }
            logger.info("testConnectorSwitching4139: looping cos ping1="+ping1+", ping2="+ping2);
            Thread.sleep(1000); // 25x1000ms = 25sec max - (vs 10sec timeout)

        }
        assertTrue(success);
        // one final heartbeat
        runHeartbeatOnceWith(instance2, instance3, instance4, instance5);
        assertTrue(pingConnector(instance3, instance2));
        assertTrue(pingConnector(instance5, instance2));

        instance2.dumpRepo();

        assertSameTopology(new SimpleClusterView(instance2),
                new SimpleClusterView(instance3, instance4),
                new SimpleClusterView(instance5));

        // restart instance1, crash instance4
        instance4.stopViewChecker();
        instance1Restarted = newBuilder().setDebugName("instance1")
                .useRepositoryOf(instance2)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1)
                .setSlingId(instance1.getSlingId()).build();
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance5);
        // give these heartbeats/votes some time .. so sleep 2sec (timeout is 10sec, so should be safe)
        Thread.sleep(2000);
        assertTrue(pingConnector(instance3, instance2));
        assertTrue(pingConnector(instance5, instance2));
        success = false;
        for(int i=0; i<40; i++) {
            runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance5);
            instance1.getViewChecker().checkView();
            // we used to do:
            //assertTrue(pingConnector(instance3, instance2));
            // but that could fail with the introduction of marking
            // an establishedView as failing upon detecting a view change
            // when the view changes, we're sending TOPOLOGY_CHANGING to listeners
            // so getTopology() should also return isCurrent==false - which
            // means that doing a ping will also fail, cos that wants to
            // get a current topology to send as an announcement.
            // which is a long way of saying: we cannot do an assert here
            // since instance3 *can* have an undefined cluster view..
            try{
                pingConnector(instance3, instance2);
            } catch(UndefinedClusterViewException ucve) {
                // ignore
            }
            pingConnector(instance5, instance2);
            final TopologyView topology = instance3.getDiscoveryService().getTopology();
            InstanceDescription i3 = null;
            for (Iterator<InstanceDescription> it = topology.getInstances().iterator(); it.hasNext();) {
                final InstanceDescription id = it.next();
                if (id.getSlingId().equals(instance3.slingId)) {
                    i3 = id;
                    break;
                }
            }
            assertNotNull(i3);
            assertEquals(instance3.slingId, i3.getSlingId());
            final ClusterView i3Cluster = i3.getClusterView();
            final int i3ClusterSize = i3Cluster.getInstances().size();
            if (i3ClusterSize==1) {
                if (i<30) {
                    logger.info("testConnectorSwitching4139: [2] min loop cnt not yet reached: i="+i);
                    Thread.sleep(500); // 30x500ms = 15sec max - (vs 10sec-2sec[sleep] timeout) - should be enough for timing out
                    continue;
                }
                success = true;
                logger.info("testConnectorSwitching4139: i3ClusterSize: "+i3ClusterSize+", i="+i+" (success)");
                break;
            }
            logger.info("testConnectorSwitching4139: i3ClusterSize: "+i3ClusterSize+", i="+i);
            Thread.sleep(500);
        }

        logger.info("testConnectorSwitching4139: instance1Restarted.slingId="+instance1Restarted.slingId);
        logger.info("testConnectorSwitching4139: instance2.slingId="+instance2.slingId);
        logger.info("testConnectorSwitching4139: instance3.slingId="+instance3.slingId);
        logger.info("testConnectorSwitching4139: instance4.slingId="+instance4.slingId);
        logger.info("testConnectorSwitching4139: instance5.slingId="+instance5.slingId);
        instance1Restarted.dumpRepo();
        assertTrue(success);

        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2),
                new SimpleClusterView(instance3),
                new SimpleClusterView(instance5));
        instance1Restarted.stop();

    }

    @Category(Slow.class) //TODO: this takes env 25sec
    @Test
    public void testDuplicateInstance3726() throws Throwable {
        logger.info("testDuplicateInstance3726: start");
        final int MIN_EVENT_DELAY = 1;

        tearDown(); // reset any setup that was done - we start with a different setup than the default one
        final org.apache.log4j.Logger discoveryLogger = RootLogger.getLogger("org.apache.sling.discovery");
        logLevel = discoveryLogger.getLevel();
        discoveryLogger.setLevel(Level.DEBUG);

        instance1 = newBuilder().setDebugName("instance1")
                .newRepository("/var/discovery/clusterA/", true)
                .setConnectorPingTimeout(15 /* sec */)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        instance2 = newBuilder().setDebugName("instance2")
                .useRepositoryOf(instance1)
                .setConnectorPingTimeout(15 /* sec */)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        // now launch the remote instance
        instance3 = newBuilder().setDebugName("instance3")
                .newRepository("/var/discovery/clusterB/", false)
                .setConnectorPingTimeout(15 /* sec */)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        instance5 = newBuilder().setDebugName("instance5")
                .newRepository("/var/discovery/clusterC/", false)
                .setConnectorPingTimeout(15 /* sec */)
                .setMinEventDelay(MIN_EVENT_DELAY).build();

        // join the instances to form a cluster by sending out heartbeats
        runHeartbeatOnceWith(instance1, instance2, instance3, instance5);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1, instance2, instance3, instance5);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance1, instance2, instance3, instance5);
        Thread.sleep(500);

        assertSameTopology(new SimpleClusterView(instance1, instance2));
        assertSameTopology(new SimpleClusterView(instance3));
        assertSameTopology(new SimpleClusterView(instance5));

        // create a topology connector from instance3 to instance1
        // -> corresponds to starting to ping
        pingConnector(instance3, instance1);
        pingConnector(instance5, instance1);
        pingConnector(instance3, instance1);
        pingConnector(instance5, instance1);

        // make asserts on the topology
        logger.info("testDuplicateInstance3726: instance1.slingId="+instance1.slingId);
        logger.info("testDuplicateInstance3726: instance2.slingId="+instance2.slingId);
        logger.info("testDuplicateInstance3726: instance3.slingId="+instance3.slingId);
        logger.info("testDuplicateInstance3726: instance5.slingId="+instance5.slingId);
        instance1.dumpRepo();

        assertSameTopology(new SimpleClusterView(instance1, instance2),
                new SimpleClusterView(instance3/*, instance4*/),
                new SimpleClusterView(instance5));

        // simulate a crash of instance1, resulting in load-balancer to switch the pings
        instance1.stopViewChecker();
        boolean success = false;
        for(int i=0; i<25; i++) {
            // loop for max 25 times, min 20 times
            runHeartbeatOnceWith(instance2, instance3, /*instance4, */instance5);
            final boolean ping1 = pingConnector(instance3, instance2);
            final boolean ping2 = pingConnector(instance5, instance2);
            if (ping1 && ping2) {
                // both pings were fine - hence break
                success = true;
                logger.info("testDuplicateInstance3726: successfully switched all pings to instance2 after "+i+" rounds.");
                if (i<20) {
                    logger.info("testDuplicateInstance3726: min loop cnt not yet reached: i="+i);
                    Thread.sleep(1000); // 20x1000ms = 20sec max - (vs 15sec timeout) - should be enough for timing out
                    continue;
                }
                break;
            }
            logger.info("testDuplicateInstance3726: looping");
            Thread.sleep(1000); // 25x1000ms = 25sec max - (vs 15sec timeout)

        }
        assertTrue(success);
        // one final heartbeat
        runHeartbeatOnceWith(instance2, instance3, instance5);
        assertTrue(pingConnector(instance3, instance2));
        assertTrue(pingConnector(instance5, instance2));

        instance2.dumpRepo();

        assertSameTopology(new SimpleClusterView(instance2),
                new SimpleClusterView(instance3),
                new SimpleClusterView(instance5));

        // restart instance1, start instance4
        instance1Restarted = newBuilder().setDebugName("instance1")
                .useRepositoryOf(instance2)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1)
                .setSlingId(instance1.getSlingId()).build();
        instance4 = newBuilder().setDebugName("instance4")
                .useRepositoryOf(instance3)
                .setConnectorPingTimeout(30 /* sec */)
                .setMinEventDelay(MIN_EVENT_DELAY).build();
        for(int i=0; i<3; i++) {
            runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4, instance5);
            Thread.sleep(250);
            // since instance4 just started - hooked to instance3
            // it is possible that it doesn't just have a topology
            // yet - so we cannot do:
            //assertTrue(pingConnector(instance3, instance2));
            // but instead do
            try{
                pingConnector(instance3, instance2);
            } catch(UndefinedClusterViewException ucve) {
                // ignore
            }
            assertTrue(pingConnector(instance5, instance2));
        }

        instance1Restarted.dumpRepo();
        logger.info("testDuplicateInstance3726: instance1Restarted.slingId="+instance1Restarted.slingId);
        logger.info("testDuplicateInstance3726: instance2.slingId="+instance2.slingId);
        logger.info("testDuplicateInstance3726: instance3.slingId="+instance3.slingId);
        logger.info("testDuplicateInstance3726: instance4.slingId="+instance4.slingId);
        logger.info("testDuplicateInstance3726: instance5.slingId="+instance5.slingId);
        assertTrue(success);

        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2),
                new SimpleClusterView(instance3, instance4),
                new SimpleClusterView(instance5));
        instance1Restarted.stop();
        logger.info("testDuplicateInstance3726: end");
    }

    private void assertSameTopology(SimpleClusterView... clusters) throws UndefinedClusterViewException {
        if (clusters==null) {
            return;
        }
        for(int i=0; i<clusters.length; i++) { // go through all clusters
            final SimpleClusterView aCluster = clusters[i];
            assertSameClusterIds(aCluster.instances);
            for(int j=0; j<aCluster.instances.length; j++) { // and all instances therein
                final VirtualInstance anInstance = aCluster.instances[j];
                assertTopology(anInstance, clusters); // an verify that they all see the same
                for(int k=0; k<clusters.length; k++) {
                    final SimpleClusterView otherCluster = clusters[k];
                    if (aCluster==otherCluster) {
                        continue; // then ignore this one
                    }
                    for(int m=0; m<otherCluster.instances.length; m++) {
                        assertNotSameClusterIds(anInstance, otherCluster.instances[m]);
                    }
                }
            }
        }
    }

    private void runHeartbeatOnceWith(VirtualInstance... instances) {
        if (instances==null) {
            return;
        }
        for(int i=0; i<instances.length; i++) {
            instances[i].heartbeatsAndCheckView();
        }
    }

    /**
     * Tests a situation where a connector was done to instance1, which eventually
     * crashed, then the connector is done to instance4 (which is in a separate, 3rd cluster).
     * Meanwhile instance1 got restarted and this test assures that the instance3 is not reported
     * twice in the topology. This used to happen prior to SLING-4139
     */
    @Test
    public void testStaleInstanceIn3Clusters4139() throws Throwable {
        logger.info("testStaleInstanceIn3Clusters4139: start");
        final String instance1SlingId = prepare4139();

        // remove topology connector from instance3 to instance1
        // -> corresponds to stop pinging
        // (nothing to assert additionally here)

        // start instance4 in a separate cluster
        instance4 = newBuilder().setDebugName("remoteInstance4")
                .newRepository("/var/discovery/implremote4/", false)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1).build();
        try{
            instance4.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        // instead, now start a connector from instance3 to instance2
        instance4.heartbeatsAndCheckView();
        instance4.heartbeatsAndCheckView();
        pingConnector(instance3, instance4);

        // start instance 1
        instance1Restarted = newBuilder().setDebugName("firstInstance")
                .useRepositoryOf(instance2)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1)
                .setSlingId(instance1SlingId).build();
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4);
        pingConnector(instance3, instance4);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4);
        pingConnector(instance3, instance4);
        logger.info("iteration 0");
        logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId);
        logger.info("instance2.slingId: "+instance2.slingId);
        logger.info("instance3.slingId: "+instance3.slingId);
        logger.info("instance4.slingId: "+instance4.slingId);
        instance1Restarted.dumpRepo();
        assertSameTopology(
                new SimpleClusterView(instance3),
                new SimpleClusterView(instance4));
        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2));

        Thread.sleep(100);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4);
        pingConnector(instance3, instance4);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4);
        pingConnector(instance3, instance4);
        logger.info("iteration 1");
        logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId);
        logger.info("instance2.slingId: "+instance2.slingId);
        logger.info("instance3.slingId: "+instance3.slingId);
        logger.info("instance4.slingId: "+instance4.slingId);
        instance1Restarted.dumpRepo();
        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2));
        assertSameTopology(
                new SimpleClusterView(instance3),
                new SimpleClusterView(instance4));

        Thread.sleep(100);
        runHeartbeatOnceWith(instance1Restarted, instance2, instance3, instance4);
        pingConnector(instance3, instance4);

        // now the situation should be as follows:
        logger.info("iteration 2");
        logger.info("instance1Restarted.slingId: "+instance1Restarted.slingId);
        logger.info("instance2.slingId: "+instance2.slingId);
        logger.info("instance3.slingId: "+instance3.slingId);
        logger.info("instance4.slingId: "+instance4.slingId);
        instance1Restarted.dumpRepo();
        assertSameTopology(new SimpleClusterView(instance1Restarted, instance2));
        assertSameTopology(
                new SimpleClusterView(instance3),
                new SimpleClusterView(instance4));
        instance1Restarted.stop();

        logger.info("testStaleInstanceIn3Clusters4139: end");
    }

    /**
     * Preparation steps for SLING-4139 tests:
     * Creates two clusters: A: with instance1 and 2, B with instance 3
     * instance 3 creates a connector to instance 1
     * then instance 1 is killed (crashes)
     * @return the slingId of the original (crashed) instance1
     */
	private String prepare4139() throws Throwable, Exception,
			InterruptedException {
	    tearDown(); // stop anything running..
        instance1 = newBuilder().setDebugName("firstInstance")
                .newRepository("/var/discovery/impl/", true)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1).build();
        instance2 = newBuilder().setDebugName("secondInstance")
                .useRepositoryOf(instance1)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1).build();
        // join the two instances to form a cluster by sending out heartbeats
        runHeartbeatOnceWith(instance1, instance2);
        Thread.sleep(100);
        runHeartbeatOnceWith(instance1, instance2);
        Thread.sleep(100);
        runHeartbeatOnceWith(instance1, instance2);
        assertSameClusterIds(instance1, instance2);

        // now launch the remote instance
        instance3 = newBuilder().setDebugName("remoteInstance")
                .newRepository("/var/discovery/implremote/", false)
                .setConnectorPingTimeout(Integer.MAX_VALUE /* no timeout */)
                .setMinEventDelay(1).build();
        assertSameClusterIds(instance1, instance2);
        try{
            instance3.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException ue) {
            // ok
        }
        assertEquals(0, instance1.getAnnouncementRegistry().listLocalAnnouncements().size());
        assertEquals(0, instance1.getAnnouncementRegistry().listLocalIncomingAnnouncements().size());
        assertEquals(0, instance2.getAnnouncementRegistry().listLocalAnnouncements().size());
        assertEquals(0, instance2.getAnnouncementRegistry().listLocalIncomingAnnouncements().size());
        assertEquals(0, instance3.getAnnouncementRegistry().listLocalAnnouncements().size());
        assertEquals(0, instance3.getAnnouncementRegistry().listLocalIncomingAnnouncements().size());

        // create a topology connector from instance3 to instance1
        // -> corresponds to starting to ping
        instance3.heartbeatsAndCheckView();
        instance3.heartbeatsAndCheckView();
        Thread.sleep(1000);
        pingConnector(instance3, instance1);
        // make asserts on the topology
        instance1.dumpRepo();
        assertSameTopology(new SimpleClusterView(instance1, instance2), new SimpleClusterView(instance3));

        // kill instance 1
        logger.info("instance1.slingId="+instance1.slingId);
        logger.info("instance2.slingId="+instance2.slingId);
        logger.info("instance3.slingId="+instance3.slingId);
        final String instance1SlingId = instance1.slingId;
        instance1.stopViewChecker(); // and have instance3 no longer pinging instance1
        instance1.stop(); // otherwise it will have itself still registered with the observation manager and fiddle with future events..
        instance1 = null; // set to null to early fail if anyone still assumes (original) instance1 is up form now on
        instance2.getConfig().setViewCheckTimeout(1); // set instance2's heartbeatTimeout to 1 sec to time out instance1 quickly!
        instance3.getConfig().setViewCheckTimeout(1); // set instance3's heartbeatTimeout to 1 sec to time out instance1 quickly!
        Thread.sleep(500);
        runHeartbeatOnceWith(instance2, instance3);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance2, instance3);
        Thread.sleep(500);
        runHeartbeatOnceWith(instance2, instance3);
        // instance 2 should now be alone - in fact, 3 should be alone as well
        instance2.dumpRepo();
        assertTopology(instance2, new SimpleClusterView(instance2));
        assertTopology(instance3, new SimpleClusterView(instance3));
        instance2.getConfig().setViewCheckTimeout(Integer.MAX_VALUE /* no timeout */); // set instance2's heartbeatTimeout back to Integer.MAX_VALUE /* no timeout */
        instance3.getConfig().setViewCheckTimeout(Integer.MAX_VALUE /* no timeout */); // set instance3's heartbeatTimeout back to Integer.MAX_VALUE /* no timeout */
		return instance1SlingId;
	}

    private void assertNotSameClusterIds(VirtualInstance... instances) throws UndefinedClusterViewException {
    	if (instances==null) {
    		fail("must not pass empty set of instances here");
    	}
    	if (instances.length<=1) {
    		fail("must not pass 0 or 1 instance only");
    	}
        final String clusterId1 = instances[0].getClusterViewService()
                .getLocalClusterView().getId();
        for(int i=1; i<instances.length; i++) {
	        final String otherClusterId = instances[i].getClusterViewService()
	                .getLocalClusterView().getId();
	        // cluster ids must NOT be the same
	        assertNotEquals(clusterId1, otherClusterId);
        }
        if (instances.length>2) {
        	final VirtualInstance[] subset = new VirtualInstance[instances.length-1];
        	System.arraycopy(instances, 0, subset, 1, instances.length-1);
        	assertNotSameClusterIds(subset);
        }
	}

	private void assertSameClusterIds(VirtualInstance... instances) throws UndefinedClusterViewException {
    	if (instances==null) {
            // then there is nothing to compare
            return;
    	}
    	if (instances.length==1) {
    	    // then there is nothing to compare
    	    return;
    	}
        final String clusterId1 = instances[0].getClusterViewService()
                .getLocalClusterView().getId();
        for(int i=1; i<instances.length; i++) {
	        final String otherClusterId = instances[i].getClusterViewService()
	                .getLocalClusterView().getId();
	        // cluster ids must be the same
	        if (!clusterId1.equals(otherClusterId)) {
	            logger.error("assertSameClusterIds: instances[0]: "+instances[0]);
	            logger.error("assertSameClusterIds: instances["+i+"]: "+instances[i]);
	            fail("mismatch in clusterIds: expected to equal: clusterId1="+clusterId1+", otherClusterId="+otherClusterId);
	        }
        }
	}

	private void assertTopology(VirtualInstance instance, SimpleClusterView... assertedClusterViews) {
    	final TopologyView topology = instance.getDiscoveryService().getTopology();
    	logger.info("assertTopology: instance "+instance.slingId+" sees topology: "+topology+", expected: "+assertedClusterViews);
    	assertNotNull(topology);
    	if (assertedClusterViews.length!=topology.getClusterViews().size()) {
            dumpFailureDetails(topology, assertedClusterViews);
    	    fail("instance "+instance.slingId+ " expected "+assertedClusterViews.length+", got: "+topology.getClusterViews().size());
    	}
    	final Set<ClusterView> actualClusters = new HashSet<ClusterView>(topology.getClusterViews());
    	for(int i=0; i<assertedClusterViews.length; i++) {
    		final SimpleClusterView assertedClusterView = assertedClusterViews[i];
    		boolean foundMatch = false;
    		for (Iterator<ClusterView> it = actualClusters.iterator(); it
					.hasNext();) {
				final ClusterView actualClusterView = it.next();
				if (matches(assertedClusterView, actualClusterView)) {
					it.remove();
					foundMatch = true;
					break;
				}
			}
    		if (!foundMatch) {
    		    dumpFailureDetails(topology, assertedClusterViews);
    			fail("instance "+instance.slingId+ " could not find a match in the topology with instance="+instance.slingId+" and clusterViews="+assertedClusterViews.length);
    		}
    	}
    	assertEquals("not all asserted clusterviews are in the actual view with instance="+instance+" and clusterViews="+assertedClusterViews, actualClusters.size(), 0);
	}

    private void dumpFailureDetails(TopologyView topology, SimpleClusterView... assertedClusterViews) {
        logger.error("assertTopology: expected: "+assertedClusterViews.length);
        for(int j=0; j<assertedClusterViews.length; j++) {
            logger.error("assertTopology:  ["+j+"]: "+assertedClusterViews[j].toString());
        }
        final Set<ClusterView> clusterViews = topology.getClusterViews();
        final Set<InstanceDescription> instances = topology.getInstances();
        logger.error("assertTopology: actual: "+clusterViews.size()+" clusters with a total of "+instances.size()+" instances");
        for (Iterator<ClusterView> it = clusterViews.iterator(); it.hasNext();) {
            final ClusterView aCluster = it.next();
            logger.error("assertTopology:  a cluster: "+aCluster.getId());
            for (Iterator<InstanceDescription> it2 = aCluster.getInstances().iterator(); it2.hasNext();) {
                final InstanceDescription id = it2.next();
                logger.error("assertTopology:   - an instance "+id.getSlingId());
            }
        }
        logger.error("assertTopology: list of all instances: "+instances.size());
        for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
            final InstanceDescription id = it.next();
            logger.error("assertTopology: - an instance: "+id.getSlingId());
        }
    }

	private boolean matches(SimpleClusterView assertedClusterView,
			ClusterView actualClusterView) {
		assertNotNull(assertedClusterView);
		assertNotNull(actualClusterView);
		if (assertedClusterView.instances.length!=actualClusterView.getInstances().size()) {
			return false;
		}
		final Set<InstanceDescription> actualInstances = new HashSet<InstanceDescription>(actualClusterView.getInstances());
		outerLoop:for(int i=0; i<assertedClusterView.instances.length; i++) {
			final VirtualInstance assertedInstance = assertedClusterView.instances[i];
			for (Iterator<InstanceDescription> it = actualInstances.iterator(); it
					.hasNext();) {
				final InstanceDescription anActualInstance = it.next();
				if (assertedInstance.slingId.equals(anActualInstance.getSlingId())) {
					continue outerLoop;
				}
			}
			return false;
		}
		return true;
	}

	private boolean pingConnector(final VirtualInstance from, final VirtualInstance to) throws UndefinedClusterViewException {
	    final Announcement fromAnnouncement = createFromAnnouncement(from);
	    Announcement replyAnnouncement = null;
	    try{
            replyAnnouncement = ping(to, fromAnnouncement);
	    } catch(AssertionError e) {
	        logger.warn("pingConnector: ping failed, assertionError: "+e);
	        return false;
	    } catch (UndefinedClusterViewException e) {
            logger.warn("pingConnector: ping failed, currently the cluster view is undefined: "+e);
            return false;
        }
        registerReplyAnnouncement(from, replyAnnouncement);
        return true;
    }

	private void registerReplyAnnouncement(VirtualInstance from,
			Announcement inheritedAnnouncement) {
		final AnnouncementRegistry announcementRegistry = from.getAnnouncementRegistry();
        if (inheritedAnnouncement.isLoop()) {
        	fail("loop detected");
        	// we dont currently support loops here in the junit tests
        	return;
        } else {
            inheritedAnnouncement.setInherited(true);
            if (announcementRegistry
                    .registerAnnouncement(inheritedAnnouncement)==-1) {
                logger.info("ping: connector response is from an instance which I already see in my topology"
                        + inheritedAnnouncement);
                return;
            }
        }
//        resultingAnnouncement = inheritedAnnouncement;
//        statusDetails = null;
	}

	private Announcement ping(VirtualInstance to, final Announcement incomingTopologyAnnouncement)
	        throws UndefinedClusterViewException {
		final String slingId = to.slingId;
		final ClusterViewService clusterViewService = to.getClusterViewService();
		final AnnouncementRegistry announcementRegistry = to.getAnnouncementRegistry();

		incomingTopologyAnnouncement.removeInherited(slingId);

        final Announcement replyAnnouncement = new Announcement(
                slingId);

        long backoffInterval = -1;
        final ClusterView clusterView = clusterViewService.getLocalClusterView();
        if (!incomingTopologyAnnouncement.isCorrectVersion()) {
        	fail("incorrect version");
            return null; // never reached
        } else if (ClusterViewHelper.contains(clusterView, incomingTopologyAnnouncement
                .getOwnerId())) {
        	fail("loop=true");
            return null; // never reached
        } else if (ClusterViewHelper.containsAny(clusterView, incomingTopologyAnnouncement
                .listInstances())) {
        	fail("incoming announcement contains instances that are part of my cluster");
            return null; // never reached
        } else {
            backoffInterval = announcementRegistry
                    .registerAnnouncement(incomingTopologyAnnouncement);
            if (backoffInterval==-1) {
            	fail("rejecting an announcement from an instance that I already see in my topology: ");
                return null; // never reached
            } else {
                // normal, successful case: replying with the part of the topology which this instance sees
                replyAnnouncement.setLocalCluster(clusterView);
                announcementRegistry.addAllExcept(replyAnnouncement, clusterView,
                        new AnnouncementFilter() {

                            @Override
                            public boolean accept(final String receivingSlingId, Announcement announcement) {
                                if (announcement.getPrimaryKey().equals(
                                        incomingTopologyAnnouncement
                                                .getPrimaryKey())) {
                                    return false;
                                }
                                return true;
                            }
                        });
                return replyAnnouncement;
            }
        }
	}

	private Announcement createFromAnnouncement(final VirtualInstance from) throws UndefinedClusterViewException {
		// TODO: refactor TopologyConnectorClient to avoid duplicating code from there (ping())
		Announcement topologyAnnouncement = new Announcement(from.slingId);
        topologyAnnouncement.setServerInfo(from.slingId);
        final ClusterView clusterView = from.getClusterViewService().getLocalClusterView();
        topologyAnnouncement.setLocalCluster(clusterView);
        from.getAnnouncementRegistry().addAllExcept(topologyAnnouncement, clusterView, new AnnouncementFilter() {

            @Override
            public boolean accept(final String receivingSlingId, final Announcement announcement) {
                // filter out announcements that are of old cluster instances
                // which I dont really have in my cluster view at the moment
                final Iterator<InstanceDescription> it =
                        clusterView.getInstances().iterator();
                while(it.hasNext()) {
                    final InstanceDescription instance = it.next();
                    if (instance.getSlingId().equals(receivingSlingId)) {
                        // then I have the receiving instance in my cluster view
                        // all fine then
                        return true;
                    }
                }
                // looks like I dont have the receiving instance in my cluster view
                // then I should also not propagate that announcement anywhere
                return false;
            }
        });
        return topologyAnnouncement;
	}

	@Test
    public void testStableClusterId() throws Throwable {
        logger.info("testStableClusterId: start");
    	// stop 1 and 2 and create them with a lower heartbeat timeout
    	instance2.stopViewChecker();
    	instance1.stopViewChecker();
        instance2.stop();
        instance1.stop();
	// SLING-4302 : first set the heartbeatTimeout to 100 sec - large enough to work on all CI instances
        instance1 = newBuilder().setDebugName("firstInstance")
                .newRepository("/var/discovery/impl/", true)
                .setConnectorPingTimeout(100)
                .setMinEventDelay(1).build();
        instance2 = newBuilder().setDebugName("secondInstance")
                .useRepositoryOf(instance1)
                .setConnectorPingTimeout(100)
                .setMinEventDelay(1).build();
        assertNotNull(instance1);
        assertNotNull(instance2);

        try{
            instance1.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }
        try{
            instance2.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        // let the sync/voting happen
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();

        String newClusterId1 = instance1.getClusterViewService()
                .getLocalClusterView().getId();
        String newClusterId2 = instance2.getClusterViewService()
                .getLocalClusterView().getId();
        // both cluster ids must be the same
        assertEquals(newClusterId1, newClusterId1);

        instance1.dumpRepo();
        assertEquals(2, instance1.getClusterViewService().getLocalClusterView().getInstances().size());
        assertEquals(2, instance2.getClusterViewService().getLocalClusterView().getInstances().size());

        // let instance2 'die' by now longer doing heartbeats
	// SLING-4302 : then set the heartbeatTimeouts back to 1 sec to have them properly time out with the sleeps applied below
        instance2.getConfig().setViewCheckTimeout(1);
        instance1.getConfig().setViewCheckTimeout(1);
        instance2.stopViewChecker(); // would actually not be necessary as it was never started.. this test only runs heartbeats manually
        instance1.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        // the cluster should now have size 1
        assertEquals(1, instance1.getClusterViewService().getLocalClusterView().getInstances().size());
        // the instance 2 should be in isolated mode as it is no longer in the established view
        // hence null
        try{
            instance2.getViewChecker().checkView();
            instance2.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        // but the cluster id must have remained stable
        instance1.dumpRepo();
        String actualClusterId = instance1.getClusterViewService()
                .getLocalClusterView().getId();
        logger.info("expected cluster id: "+newClusterId1);
        logger.info("actual   cluster id: "+actualClusterId);
		assertEquals(newClusterId1, actualClusterId);
        logger.info("testStableClusterId: end");
    }

    @Test
    public void testClusterView() throws Exception {
        logger.info("testClusterView: start");
        assertNotNull(instance1);
        assertNotNull(instance2);
        assertNull(instance3);
        instance3 = newBuilder().setDebugName("thirdInstance")
                .useRepositoryOf(instance1)
                .build();
        assertNotNull(instance3);

        assertEquals(instance1.getSlingId(), instance1.getClusterViewService()
                .getSlingId());
        assertEquals(instance2.getSlingId(), instance2.getClusterViewService()
                .getSlingId());
        assertEquals(instance3.getSlingId(), instance3.getClusterViewService()
                .getSlingId());

        try{
            instance1.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }
        try{
            instance2.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }
        try{
            instance3.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        instance1.dumpRepo();

        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        instance3.heartbeatsAndCheckView();

        instance1.dumpRepo();
        logger.info("testClusterView: 1st 2s sleep");
        Thread.sleep(2000);

        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        instance3.heartbeatsAndCheckView();
        logger.info("testClusterView: 2nd 2s sleep");
        Thread.sleep(2000);

        instance1.dumpRepo();
        String clusterId1 = instance1.getClusterViewService().getLocalClusterView()
                .getId();
        logger.info("clusterId1=" + clusterId1);
        String clusterId2 = instance2.getClusterViewService().getLocalClusterView()
                .getId();
        logger.info("clusterId2=" + clusterId2);
        String clusterId3 = instance3.getClusterViewService().getLocalClusterView()
                .getId();
        logger.info("clusterId3=" + clusterId3);
        assertEquals(clusterId1, clusterId2);
        assertEquals(clusterId1, clusterId3);

        assertEquals(3, instance1.getClusterViewService().getLocalClusterView()
                .getInstances().size());
        assertEquals(3, instance2.getClusterViewService().getLocalClusterView()
                .getInstances().size());
        assertEquals(3, instance3.getClusterViewService().getLocalClusterView()
                .getInstances().size());
        logger.info("testClusterView: end");
    }

    @Category(Slow.class) //TODO: this takes env 15sec
    @Test
    public void testAdditionalInstance() throws Throwable {
        logger.info("testAdditionalInstance: start");
        assertNotNull(instance1);
        assertNotNull(instance2);

        assertEquals(instance1.getSlingId(), instance1.getClusterViewService()
                .getSlingId());
        assertEquals(instance2.getSlingId(), instance2.getClusterViewService()
                .getSlingId());

        try{
            instance1.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }
        try{
            instance2.getClusterViewService().getLocalClusterView();
            fail("should complain");
        } catch(UndefinedClusterViewException e) {
            // ok
        }

        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();

        instance1.dumpRepo();
        logger.info("testAdditionalInstance: 1st 2s sleep");
        Thread.sleep(2000);

        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        logger.info("testAdditionalInstance: 2nd 2s sleep");
        Thread.sleep(2000);

        instance1.dumpRepo();
        String clusterId1 = instance1.getClusterViewService().getLocalClusterView()
                .getId();
        logger.info("clusterId1=" + clusterId1);
        String clusterId2 = instance2.getClusterViewService().getLocalClusterView()
                .getId();
        logger.info("clusterId2=" + clusterId2);
        assertEquals(clusterId1, clusterId2);

        assertEquals(2, instance1.getClusterViewService().getLocalClusterView()
                .getInstances().size());
        assertEquals(2, instance2.getClusterViewService().getLocalClusterView()
                .getInstances().size());

        AssertingTopologyEventListener assertingTopologyEventListener = new AssertingTopologyEventListener();
        assertingTopologyEventListener.addExpected(Type.TOPOLOGY_INIT);
        assertEquals(1, assertingTopologyEventListener.getRemainingExpectedCount());
        instance1.bindTopologyEventListener(assertingTopologyEventListener);
        Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays
        assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());

        // startup instance 3
        AcceptsMultiple acceptsMultiple = new AcceptsMultiple(
                Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
        assertingTopologyEventListener.addExpected(acceptsMultiple);
        assertingTopologyEventListener.addExpected(acceptsMultiple);
        instance3 = newBuilder().setDebugName("thirdInstance")
                .useRepositoryOf(instance1)
                .build();
        for(int i=0; i<4; i++) {
            instance1.heartbeatsAndCheckView();
            instance2.heartbeatsAndCheckView();
            instance3.heartbeatsAndCheckView();
            logger.info("testAdditionalInstance: i="+i+", 2s sleep");
            Thread.sleep(2000);
        }

        assertEquals(1, acceptsMultiple.getEventCnt(Type.TOPOLOGY_CHANGING));
        assertEquals(1, acceptsMultiple.getEventCnt(Type.TOPOLOGY_CHANGED));
        logger.info("testAdditionalInstance: end");
    }

    @Test
    public void testPropertyProviders() throws Throwable {
        logger.info("testPropertyProviders: start");
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        assertNull(instance3);
        instance3 = newBuilder().setDebugName("thirdInstance")
                .useRepositoryOf(instance1)
                .build();
        instance3.heartbeatsAndCheckView();
        logger.info("testPropertyProviders: 1st 2s sleep");
        Thread.sleep(2000);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        instance3.heartbeatsAndCheckView();
        logger.info("testPropertyProviders: 2nd 2s sleep");
        Thread.sleep(2000);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        instance3.heartbeatsAndCheckView();
        logger.info("testPropertyProviders: 3rd 2s sleep");
        Thread.sleep(2000);

        property1Value = UUID.randomUUID().toString();
        property1Name = UUID.randomUUID().toString();
        PropertyProviderImpl pp1 = new PropertyProviderImpl();
        pp1.setProperty(property1Name, property1Value);
        instance1.bindPropertyProvider(pp1, property1Name);

        property2Value = UUID.randomUUID().toString();
        property2Name = UUID.randomUUID().toString();
        PropertyProviderImpl pp2 = new PropertyProviderImpl();
        pp2.setProperty(property2Name, property2Value);
        instance2.bindPropertyProvider(pp2, property2Name);

        assertPropertyValues();

        property1Value = UUID.randomUUID().toString();
        pp1.setProperty(property1Name, property1Value);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();

        assertPropertyValues();
        assertNull(instance1.getClusterViewService().getLocalClusterView()
                .getInstances().get(0)
                .getProperty(UUID.randomUUID().toString()));
        assertNull(instance2.getClusterViewService().getLocalClusterView()
                .getInstances().get(0)
                .getProperty(UUID.randomUUID().toString()));
        logger.info("testPropertyProviders: end");
    }

    private void assertPropertyValues() throws UndefinedClusterViewException {
        assertPropertyValues(instance1.getSlingId(), property1Name,
                property1Value);
        assertPropertyValues(instance2.getSlingId(), property2Name,
                property2Value);
    }

    private void assertPropertyValues(String slingId, String name, String value) throws UndefinedClusterViewException {
        assertEquals(value, getInstance(instance1, slingId).getProperty(name));
        assertEquals(value, getInstance(instance2, slingId).getProperty(name));
    }

    private InstanceDescription getInstance(VirtualInstance instance, String slingId) throws UndefinedClusterViewException {
        Iterator<InstanceDescription> it = instance.getClusterViewService()
                .getLocalClusterView().getInstances().iterator();
        while (it.hasNext()) {
            InstanceDescription id = it.next();
            if (id.getSlingId().equals(slingId)) {
                return id;
            }
        }
        throw new IllegalStateException("instance not found: instance="
                + instance + ", slingId=" + slingId);
    }

    class LongRunningListener implements TopologyEventListener {

        String failMsg = null;

        boolean initReceived = false;
        int noninitReceived;

        private Semaphore changedSemaphore = new Semaphore(0);

        public void assertNoFail() {
            if (failMsg!=null) {
                fail(failMsg);
            }
        }

        public Semaphore getChangedSemaphore() {
            return changedSemaphore;
        }

        @Override
        public void handleTopologyEvent(TopologyEvent event) {
            if (failMsg!=null) {
                failMsg += "/ Already failed, got another event; "+event;
                return;
            }
            if (!initReceived) {
                if (event.getType()!=Type.TOPOLOGY_INIT) {
                    failMsg = "Expected TOPOLOGY_INIT first, got: "+event.getType();
                    return;
                }
                initReceived = true;
                return;
            }
            if (event.getType()==Type.TOPOLOGY_CHANGED) {
                try {
                    changedSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new Error("don't interrupt me pls: "+e);
                }
            }
            noninitReceived++;
        }
    }

    /**
     * Test plan:
     *  * have a discoveryservice with two listeners registered
     *  * one of them (the 'first' one) is long running
     *  * during one of the topology changes, when the first
     *    one is hit, deactivate the discovery service
     *  * that deactivation used to block (SLING-4755) due
     *    to synchronized(lock) which was blocked by the
     *    long running listener. With having asynchronous
     *    event sending this should no longer be the case
     *  * also, once asserted that deactivation finished,
     *    and that the first listener is still busy, make
     *    sure that once the first listener finishes, that
     *    the second listener still gets the event
     * @throws Throwable
     */
    @Category(Slow.class) //TODO: this takes env 15sec
    @Test
    public void testLongRunningListener() throws Throwable {
        // let the instance1 become alone, instance2 is idle
        instance1.getConfig().setViewCheckTimeout(2);
        instance2.getConfig().setViewCheckTimeout(2);
        logger.info("testLongRunningListener : letting instance2 remain silent from now on");
        instance1.heartbeatsAndCheckView();
        Thread.sleep(1500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(1500);
        instance1.heartbeatsAndCheckView();
        Thread.sleep(1500);
        instance1.heartbeatsAndCheckView();
        logger.info("testLongRunningListener : instance 2 should now be considered dead");
//        instance1.dumpRepo();

        LongRunningListener longRunningListener1 = new LongRunningListener();
        AssertingTopologyEventListener fastListener2 = new AssertingTopologyEventListener();
        fastListener2.addExpected(Type.TOPOLOGY_INIT);
        longRunningListener1.assertNoFail();
        assertEquals(1, fastListener2.getRemainingExpectedCount());
        logger.info("testLongRunningListener : binding longRunningListener1 ...");
        instance1.bindTopologyEventListener(longRunningListener1);
        logger.info("testLongRunningListener : binding fastListener2 ...");
        instance1.bindTopologyEventListener(fastListener2);
        logger.info("testLongRunningListener : waiting a bit for longRunningListener1 to receive the TOPOLOGY_INIT event");
        Thread.sleep(2500); // SLING-4755: async event sending requires some minimal wait time nowadays
        assertEquals(0, fastListener2.getRemainingExpectedCount());
        assertTrue(longRunningListener1.initReceived);

        // after INIT, now do an actual change where listener1 will do a long-running handling
        fastListener2.addExpected(Type.TOPOLOGY_CHANGING);
        fastListener2.addExpected(Type.TOPOLOGY_CHANGED);
        instance1.getConfig().setViewCheckTimeout(10);
        instance2.getConfig().setViewCheckTimeout(10);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        Thread.sleep(500);
        instance1.heartbeatsAndCheckView();
        instance2.heartbeatsAndCheckView();
        Thread.sleep(500);

        instance1.dumpRepo();
        longRunningListener1.assertNoFail();
        // nothing unexpected should arrive at listener2:
        assertEquals(0, fastListener2.getUnexpectedCount());
        // however, listener2 should only get one (CHANGING) event, cos the CHANGED event is still blocked
        assertEquals(1, fastListener2.getRemainingExpectedCount());
        // and also listener2 should only get CHANGING, the CHANGED is blocked via changedSemaphore
        assertEquals(1, longRunningListener1.noninitReceived);
        Thread.sleep(2000);
        assertTrue(longRunningListener1.getChangedSemaphore().hasQueuedThreads());
        Thread.sleep(2000);
        // even after a 2sec sleep things should be unchanged:
        assertEquals(0, fastListener2.getUnexpectedCount());
        assertEquals(1, fastListener2.getRemainingExpectedCount());
        assertEquals(1, longRunningListener1.noninitReceived);
        assertTrue(longRunningListener1.getChangedSemaphore().hasQueuedThreads());

        // now let's simulate SLING-4755: deactivation while longRunningListener1 does long processing
        // - which is simulated by waiting on changedSemaphore.
        final List<Exception> asyncException = new LinkedList<Exception>();
        Thread th = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    instance1.stop();
                } catch (Exception e) {
                    synchronized(asyncException) {
                        asyncException.add(e);
                    }
                }
            }

        });
        th.start();
        logger.info("Waiting max 4 sec...");
        th.join(4000);
        logger.info("Done waiting max 4 sec...");
        if (th.isAlive()) {
            logger.warn("Thread still alive: "+th.isAlive());
            // release before issuing fail as otherwise test will block forever
            longRunningListener1.getChangedSemaphore().release();
            fail("Thread was still alive");
        }
        logger.info("Thread was no longer alive: "+th.isAlive());
        synchronized(asyncException) {
            logger.info("Async exceptions: "+asyncException.size());
            if (asyncException.size()!=0) {
                // release before issuing fail as otherwise test will block forever
                longRunningListener1.getChangedSemaphore().release();
                fail("async exceptions: "+asyncException.size()+", first: "+asyncException.get(0));
            }
        }

        // now the test consists of
        // a) the fact that we reached this place without unlocking the changedSemaphore
        // b) when we now unlock the changedSemaphore the remaining events should flush through
        longRunningListener1.getChangedSemaphore().release();
        Thread.sleep(500);// shouldn't take long and then things should have flushed:
        assertEquals(0, fastListener2.getUnexpectedCount());
        assertEquals(0, fastListener2.getRemainingExpectedCount());
        assertEquals(2, longRunningListener1.noninitReceived);
        assertFalse(longRunningListener1.getChangedSemaphore().hasQueuedThreads());
    }


}
