blob: 1feaecd27746598f87fb98ddb91a5380cb35ea20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.oak.cluster;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.DefaultTopologyView;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
import org.apache.sling.discovery.base.its.setup.VirtualInstance;
import org.apache.sling.discovery.commons.providers.base.DummyListener;
import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
import org.apache.sling.discovery.commons.providers.spi.base.SyncTokenService;
import org.apache.sling.discovery.oak.JoinerDelayTest;
import org.apache.sling.discovery.oak.Log4jErrorCatcher;
import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
import org.apache.sling.discovery.oak.its.setup.SimulatedLeaseCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class OakClusterViewServiceTest {
enum Speed { SLOW, FAST };
private static final boolean REUSE_CLUSTERNODEID = true;
private static final boolean NEW_CLUSTERNODEID = false;
private static final boolean REUSE_SLINGID = true;
private static final boolean NEW_SLINGID = false;
private Log4jErrorCatcher catcher;
@Before
public void setup() {
catcher = Log4jErrorCatcher.installErrorCatcher(OakClusterViewService.class);
}
@After
public void teardown() {
if (catcher != null) {
final String errorMsg = catcher.getLastErrorMsg();
if (errorMsg != null) {
fail("Unexpected error msg : " + errorMsg);
}
catcher.uninstall();
catcher = null;
}
}
@Test
public void testLocalClusterView() throws Exception {
OakVirtualInstanceBuilder builder1 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance1")
.newRepository("/foo/barrio/foo/", true)
.setConnectorPingInterval(999)
.setConnectorPingTimeout(999);
VirtualInstance instance1 = builder1.build();
ClusterViewService cvs1 = instance1.getClusterViewService();
try {
cvs1.getLocalClusterView();
fail("should throw UndefinedClusterViewException");
} catch (UndefinedClusterViewException e) {
// ok
}
OakVirtualInstanceBuilder builder2 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance2")
.useRepositoryOf(instance1)
.setConnectorPingInterval(999)
.setConnectorPingTimeout(999);
VirtualInstance instance2 = builder2.build();
instance1.heartbeatsAndCheckView();
instance2.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
instance2.heartbeatsAndCheckView();
// this should not fail:
assertNotNull(cvs1.getLocalClusterView());
final ClusterViewService cvs2 = instance2.getClusterViewService();
final IdMapService idMapService = builder2.getIdMapService();
idMapService.clearCache();
idMapService.clearCache();
// this should not fail neither:
assertNotNull(cvs2.getLocalClusterView());
instance2.shutdownRepository();
try {
// but this should fail as the repository is shutdown
cvs2.getLocalClusterView();
fail("should throw UndefinedClusterViewException");
} catch(UndefinedClusterViewException e) {
// ok
}
}
@Test
public void testGetResourceResolverException() throws Exception {
OakClusterViewService cvs = OakClusterViewService.testConstructor(null, null, null, null);
try {
cvs.getLocalClusterView();
fail("should throw UndefinedClusterViewException");
} catch (UndefinedClusterViewException e) {
// ok
}
}
private void doTestRejoining(boolean reuseClusterNodeId, boolean reuseSlingId, Speed speed, int expectedInstanceCount, int expectedEventCount) throws Exception {
OakVirtualInstanceBuilder builder1 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance1")
.newRepository("/foo/barrio/foo/", true)
.setConnectorPingInterval(9970)
.setConnectorPingTimeout(9971);
builder1.getConfig().setMinEventDelay(0);
builder1.getConfig().setJoinerDelaySeconds(0);
builder1.getConfig().setDiscoveryLiteCheckInterval(9974);
builder1.getConfig().setSyncTokenEnabled(true);
builder1.getConfig().setSuppressPartiallyStartedInstance(true);
final VirtualInstance instance1 = builder1.build();
ClusterViewService cvs1 = instance1.getClusterViewService();
DummyListener listener1 = JoinerDelayTest.newDummyListener(instance1);
// first let instance1 be by its own
System.out.println("[A] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("[B] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("[C] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
assertEquals(1, listener1.countEvents());
OakVirtualInstanceBuilder builder2 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance2")
.useRepositoryOf(instance1)
.setConnectorPingInterval(9980)
.setConnectorPingTimeout(9981);
builder2.getConfig().setMinEventDelay(0);
builder2.getConfig().setJoinerDelaySeconds(0);
builder2.getConfig().setDiscoveryLiteCheckInterval(9984);
builder2.getConfig().setSyncTokenEnabled(true);
builder2.getConfig().setSuppressPartiallyStartedInstance(true);
VirtualInstance instance2 = builder2.build();
builder2.getSimulatedLeaseCollection().incSeqNum();
ClusterViewService cvs2 = instance2.getClusterViewService();
// join the 2 nodes
System.out.println("[D] instance 2 heartbeat, slingId = "+ instance2.getSlingId() + ", id = " + builder2.getLease().getClusterNodeId());
instance2.heartbeatsAndCheckView();
System.out.println("[E] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
System.out.println("[E] instance 2 heartbeat, slingId = "+ instance2.getSlingId() + ", id = " + builder2.getLease().getClusterNodeId());
instance2.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("[F] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
System.out.println("[G] instance 2 heartbeat, slingId = "+ instance2.getSlingId() + ", id = " + builder2.getLease().getClusterNodeId());
instance2.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("[I] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
System.out.println("[J] instance 2 heartbeat, slingId = "+ instance2.getSlingId() + ", id = " + builder2.getLease().getClusterNodeId());
instance2.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("[I] instance 1 heartbeat, slingId = "+ instance1.getSlingId() + ", id = " + builder1.getLease().getClusterNodeId());
instance1.heartbeatsAndCheckView();
System.out.println("[J] instance 2 heartbeat, slingId = "+ instance2.getSlingId() + ", id = " + builder2.getLease().getClusterNodeId());
instance2.heartbeatsAndCheckView();
assertEquals(3, listener1.countEvents());
// assert the join worked as expected
assertEquals(2, cvs1.getLocalClusterView().getInstances().size());
assertEquals(2, cvs2.getLocalClusterView().getInstances().size());
final OakVirtualInstanceBuilder builder2b =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance2b")
.useRepositoryOf(instance1)
.setConnectorPingInterval(9990)
.setConnectorPingTimeout(9991);
if (reuseSlingId) {
builder2b.setSlingId(builder2.getSlingId());
}
if (reuseClusterNodeId) {
final int id2 = builder2.getLease().getClusterNodeId();
builder2b.getLease().setClusterNodeIdHint(id2);
} else {
final int id2 = 42;
builder2b.getLease().setClusterNodeIdHint(id2);
}
builder2b.getConfig().setMinEventDelay(0);
builder2b.getConfig().setJoinerDelaySeconds(0);
builder2b.getConfig().setDiscoveryLiteCheckInterval(9993);
builder2b.getConfig().setSyncTokenEnabled(true);
builder2b.getConfig().setSuppressPartiallyStartedInstance(true);
System.out.println();
System.out.println(" => 2B BUILDING");
System.out.println();
builder2.getLease().deactivate();
// now step 1 is to join through the leases, without yet doing the syncToken thingy
SimulatedLeaseCollection c = builder2b.getSimulatedLeaseCollection();//new
c.incSeqNum(2);//new
System.out.println();
System.out.println(" UPDATING SEQ NUM TO " + c.getSeqNum());
System.out.println();
builder2b.updateLease();
System.out.println();
System.out.println("sleep 1s");
Thread.sleep(1000);
// now let instance1 go through a heartbeat - without checkView
builder1.updateLease();
// that would have updated the lease but also gotten to read the new state
// we do that last part explicitly here again
// since rejoined instance2 did not yet syncToken, it should for sure not be in the view
// but also not cause an exception
cvs1.getLocalClusterView();
instance1.heartbeatsAndCheckView();
AtomicReference<Exception> lastAsyncException = new AtomicReference<>();
final AtomicReference<VirtualInstance> instance2bRef = new AtomicReference<>();
if (speed == Speed.FAST) {
VirtualInstance instance2b = builder2b.build();
instance2bRef.set(instance2b);
instance1.heartbeatsAndCheckView();
System.out.println();
System.out.println(" => 2B HEARTBEAT ... " + instance2b.getSlingId());
System.out.println();
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println();
System.out.println(" => 2B HEARTBEAT ...");
System.out.println();
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println();
System.out.println(" => 2B HEARTBEAT ...");
System.out.println();
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
} else {
Runnable r2bb = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
VirtualInstance instance2b = builder2b.build();
instance2bRef.set(instance2b);
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
instance2b.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
} catch (InterruptedException e) {
e.printStackTrace();
lastAsyncException.set(e);
} catch (Exception e) {
e.printStackTrace();
lastAsyncException.set(e);
}
}
};
Thread th = new Thread(r2bb);
th.start();
}
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
instance1.heartbeatsAndCheckView();
Thread.sleep(1000);
System.out.println("instance 1 is " + instance1.getSlingId() + " - " + builder1.getLease().getClusterNodeId() + " - " + builder1.getLease());
System.out.println("instance 2 is " + instance2.getSlingId() + " - " + builder2.getLease().getClusterNodeId() + " - " + builder2.getLease());
if (speed == Speed.FAST) System.out.println("instance 2b is " + instance2bRef.get().getSlingId() + " - " + builder2b.getLease().getClusterNodeId() + " - " + builder2b.getLease());
final Set<InstanceDescription> instances = listener1.getLastView().getInstances();
for (InstanceDescription id : instances) {
System.out.println("id " + id);
}
List<TopologyEvent> ev = listener1.getEvents();
for (TopologyEvent e : ev) {
System.out.println(" e = " + e);
}
assertEquals(expectedEventCount, listener1.countEvents());
assertEquals(expectedInstanceCount, instances.size());
if (speed == Speed.SLOW) {
Thread.sleep(5000);
}
// and now for full heartbeating
instance1.heartbeatsAndCheckView();
instance2bRef.get().heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
instance2bRef.get().heartbeatsAndCheckView();
assertEquals(2, cvs1.getLocalClusterView().getInstances().size());
assertEquals(2, instance2bRef.get().getClusterViewService().getLocalClusterView().getInstances().size());
instance1.shutdownRepository();
}
// new rules
// R1 : new clusterNodeId joining, without all sync data => suppression
// R2 : existing clusterNodeId (re)joining
// R2.1 : same slingId but not syncToken => no suppression
// R2.2 : new slingId => suppression
// R2.3 : same slingId, different leaderElectionId => suppression
/**
* R2.1 : same slingId but not syncToken => no suppression
* an AEM reusing the clusterNodeId and slingId - rejoins fast
*/
@Test
public void testRejoining_reuseClusterNodeId_reuseSlingId_fast() throws Exception {
doTestRejoining(REUSE_CLUSTERNODEID, REUSE_SLINGID, Speed.FAST, 2, 5);
}
/**
* R2.2 : new slingId => suppression
* an AEM reusing the clusterNodeId but not slingId (hence a new slingId) - rejoins fast
*/
@Test
public void testRejoining_reuseClusterNodeId_newSlingId_fast() throws Exception {
doTestRejoining(REUSE_CLUSTERNODEID, NEW_SLINGID, Speed.FAST, 2, 5);
}
/**
* R1 : new clusterNodeId joining, without all sync data => suppression
* an AEM reusing the slingId but not clusterNoddId - rejoins fast
*/
@Test
public void testRejoining_newClusterNodeId_reuseSlingId_fast() throws Exception {
doTestRejoining(NEW_CLUSTERNODEID, REUSE_SLINGID, Speed.FAST, 2, 5);
}
/**
* R1 : new clusterNodeId joining, without all sync data => suppression
* an AEM coming with fresh clusterNodeId and slingId - fast
*/
@Test
public void testRejoining_newClusterNodeId_newSlingId_fast() throws Exception {
doTestRejoining(NEW_CLUSTERNODEID, NEW_SLINGID, Speed.FAST, 2, 5);
}
/**
* R2.1 : same slingId but not syncToken => no suppression
* an AEM reusing the clusterNodeId and slingId
*/
@Test
public void testRejoining_reuseClusterNodeId_reuseSlingId_slow() throws Exception {
// expecting only 4 events (thus no_topology) due to slowness of rejoining instance
// combined with discovery noticing this (due to missing syncToken)
doTestRejoining(REUSE_CLUSTERNODEID, REUSE_SLINGID, Speed.SLOW, 2, 4);
}
/**
* R2.2 : new slingId => suppression
* an AEM reusing the clusterNodeId but not slingId, a new slingId - rejoins slow
*/
@Test
public void testRejoining_reuseClusterNodeId_newSlingId_slow() throws Exception {
// expecting only 4 events (thus no_topology) due to slowness of rejoining instance
// which only once it does write its slingId will be considered for suppressing, not before
// (due to reusing clusterNodeId)
doTestRejoining(REUSE_CLUSTERNODEID, NEW_SLINGID, Speed.SLOW, 2, 4);
}
/**
* R1 : new clusterNodeId joining, without all sync data => suppression
* an AEM reusing the slingId but not clusterNoddId but a new clusterNodeId - rejoins slow
*/
@Test
public void testRejoining_newClusterNodeId_reuseSlingId_slow() throws Exception {
doTestRejoining(NEW_CLUSTERNODEID, REUSE_SLINGID, Speed.SLOW, 1/* only instance1 remaining*/,
5 /* kicks out instance2, no new instance, but a stable topology*/);
}
/**
* R1 : new clusterNodeId joining, without all sync data => suppression
* an AEM coming with fresh clusterNodeId and slingId - rejoins slow
*/
@Test
public void testRejoining_newClusterNodeId_newSlingId_slow() throws Exception {
doTestRejoining(NEW_CLUSTERNODEID, NEW_SLINGID, Speed.SLOW, 1 /* only instance1 remaining*/,
5 /* kicks out instance2, no new instance, but a stable topology*/);
}
@Test
public void testSuppression() throws Exception {
OakVirtualInstanceBuilder builder1 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance1")
.newRepository("/foo/barrio/foo/", true)
.setConnectorPingInterval(999)
.setConnectorPingTimeout(999);
builder1.getConfig().setMinEventDelay(99);
builder1.getConfig().setJoinerDelaySeconds(99);
builder1.getConfig().setDiscoveryLiteCheckInterval(99);
builder1.getConfig().setSyncTokenEnabled(true);
builder1.getConfig().setSuppressPartiallyStartedInstance(true);
VirtualInstance instance1 = builder1.build();
ClusterViewService cvs1 = instance1.getClusterViewService();
OakVirtualInstanceBuilder builder2 =
(OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
.setDebugName("instance2")
.useRepositoryOf(instance1)
.setConnectorPingInterval(999)
.setConnectorPingTimeout(999);
builder2.getConfig().setMinEventDelay(99);
builder2.getConfig().setJoinerDelaySeconds(99);
builder2.getConfig().setDiscoveryLiteCheckInterval(99);
builder2.getConfig().setSyncTokenEnabled(true);
builder2.getConfig().setSuppressPartiallyStartedInstance(true);
VirtualInstance instance2 = builder2.build();
instance1.heartbeatsAndCheckView();
instance2.heartbeatsAndCheckView();
instance1.heartbeatsAndCheckView();
LocalClusterView view1 = cvs1.getLocalClusterView();
assertEquals(2, view1.getInstances().size());
// now do a switch (seqNum++) but don't update instance2's syncToken
cvs1.getLocalClusterView();
builder1.getSimulatedLeaseCollection().incSeqNum();
builder1.updateLease();
LocalClusterView view2 = cvs1.getLocalClusterView();
final DefaultTopologyView topology = new DefaultTopologyView();
topology.setLocalClusterView(view2);
SyncTokenService s = builder2.getSyncTokenService();
final AtomicBoolean syncDone = new AtomicBoolean(false);
s.sync(topology, new Runnable() {
@Override
public void run() {
syncDone.set(true);
}
});
s.cancelSync();
assertFalse(syncDone.get());
Thread.sleep(1000);
assertFalse(syncDone.get());
assertFalse(view2.hasPartiallyStartedInstances());
}
}