blob: 260ec473c6e36d23566caf404d03dfade4db2656 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.assertj.core.api.Assertions.fail;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@Category({ClientSubscriptionTest.class})
public class DurableClientHAQueuedDUnitTest extends DurableClientTestBase {
/**
* Tests the ha queued events stat Connects a durable client, registers durable cqs and then shuts
* down the durable client Publisher then does puts onto the server Events are queued up and the
* stats are checked Durable client is then reconnected, events are dispatched and stats are
* rechecked
*/
@Test
public void testHAQueueSizeStat() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
final String durableClientId = getName() + "_client";
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// verify cq stats are correct
checkNumDurableCqs(server1VM, durableClientId, 3);
checkHAQueueSize(server1VM, durableClientId, 10, 11);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// Re-register durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
// This can cause events to linger in the queue due to a "later" ack and only cleared on
// the next dispatch. We need to send one more message to dispatch, that calls remove one more
// time and any remaining acks (with or without this final published events ack)
flushEntries(server1VM, durableClientVM, regionName);
checkHAQueueSize(server1VM, durableClientId, 0, 1);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Tests the ha queued events stat Connects a durable client, registers durable cqs and then shuts
* down the durable client Publisher then does puts onto the server Events are queued up and the
* stats are checked Test sleeps until durable client times out Stats should now be 0 Durable
* client is then reconnected, no events should exist and stats are rechecked
*/
@Test
public void testHAQueueSizeStatExpired() {
int timeoutInSeconds = 20;
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
final String durableClientId = getName() + "_client";
startDurableClient(durableClientVM, durableClientId, server1Port, regionName, timeoutInSeconds);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on server
verifyDurableClientPresent(timeoutInSeconds, durableClientId,
server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// verify cq stats are correct
checkNumDurableCqs(server1VM, durableClientId, 3);
checkHAQueueSize(server1VM, durableClientId, 10, 11);
// pause until timeout
try {
Thread.sleep((timeoutInSeconds + 2) * 1000);
} catch (InterruptedException ie) {
fail("interrupted", ie);
}
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
// Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
// This can cause events to linger in the queue due to a "later" ack and only cleared on
// the next dispatch. We need to send one more message to dispatch, that calls remove one more
// time and any remaining acks (with or without this final published events ack)
flushEntries(server1VM, durableClientVM, regionName);
checkHAQueueSize(server1VM, durableClientId, 0, 1);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Tests the ha queued events stat Starts up two servers, shuts one down Connects a durable
* client, registers durable cqs and then shuts down the durable client Publisher then does puts
* onto the server Events are queued up Durable client is then reconnected but does not send ready
* for events Secondary server is brought back up Stats are checked Durable client then
* reregisters cqs and sends ready for events
*/
@Test
public void testHAQueueSizeStatForGII() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// shut down server 2
closeCache(server2VM);
final String durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
checkNumDurableCqs(server1VM, durableClientId, 3);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// Re-start server2, at this point it will be the first time server2 has connected to client
this.server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
server2Port));
// Verify durable client on server2
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
// verify cqs and stats on server 2. These events are through gii, stats should be correct
checkNumDurableCqs(server2VM, durableClientId, 3);
checkHAQueueSize(server2VM, durableClientId, 10, 11);
closeCache(server1VM);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
// verify cq listeners received events
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for server2 (we failed over)
flushEntries(server2VM, durableClientVM, regionName);
checkHAQueueSize(server2VM, durableClientId, 0, 1);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the servers
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Tests the ha queued cq stat
*/
@Test
public void testHAQueuedCqStat() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
durableClientId = getName() + "_client";
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// verify cq stats are correct
checkNumDurableCqs(server1VM, durableClientId, 3);
checkCqStatOnServer(server1VM, durableClientId, "All", 10);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
// This can cause events to linger in the queue due to a "later" ack and only cleared on
// the next dispatch. We need to send one more message to dispatch, that calls remove one more
// time and any remaining acks (with or without this final published events ack)
flushEntries(server1VM, durableClientVM, regionName);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
@Test
public void testHAQueuedCqStatOnSecondary() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
final String durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on server 2
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
// Verify durable client on server
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// verify cq stats are correct on both servers
checkNumDurableCqs(server1VM, durableClientId, 3);
checkCqStatOnServer(server1VM, durableClientId, "All", 10);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
// verify cq stats are correct
checkNumDurableCqs(server2VM, durableClientId, 3);
checkCqStatOnServer(server2VM, durableClientId, "All", 10);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for both servers
flushEntries(server1VM, durableClientVM, regionName);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "All", 0);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Server 2 comes up, client connects and registers cqs, server 2 then disconnects events are put
* into region client goes away server 2 comes back up and should get a gii check stats server 1
* goes away client comes back and receives all events stats should still be correct
*/
@Test
public void testHAQueuedCqStatForGII() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start a durable client that is kept alive on the server when it stops
// normally
final String durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on both servers
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// verify durable cqs on both servers
checkNumDurableCqs(server1VM, durableClientId, 3);
checkNumDurableCqs(server2VM, durableClientId, 3);
// shutdown server 2
closeCache(server2VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// Re-start server2, should get events through gii
this.server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
server2Port));
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// verify cq stats are correct on server 2
checkNumDurableCqs(server2VM, durableClientId, 3);
checkCqStatOnServer(server2VM, durableClientId, "All", 10);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
closeCache(server1VM);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for server2 (we failed over)
flushEntries(server2VM, durableClientVM, regionName);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
// Stop the durable clients
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the servers
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Start both servers, but shut down secondary server before durable client has connected. Connect
* durable client to primary, register cqs and then shutdown durable client Publish events,
* reconnect durable client but do not send ready for events Restart secondary and check stats to
* be sure cqs have correct stats due to GII Shutdown primary and fail over to secondary Durable
* Client sends ready or events and receives events Recheck stats
*/
@Test
public void testHAQueuedCqStatForGII2() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// shut down server 2
closeCache(server2VM);
durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
checkNumDurableCqs(server1VM, durableClientId, 3);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// Re-start server2, at this point it will be the first time server2 has connected to client
this.server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
server2Port));
// Verify durable client on server2
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
// verify cqs and stats on server 2. These events are through gii, stats should be correct
checkNumDurableCqs(server2VM, durableClientId, 3);
checkCqStatOnServer(server2VM, durableClientId, "All", 10);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
closeCache(server1VM);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for server2 (we failed over)
flushEntries(server2VM, durableClientVM, regionName);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the servers
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Server 2 comes up and goes down after client connects and registers cqs events are put into
* region client goes away server 2 comes back up and should get a gii check stats client comes
* back and receives all events stats should still be correct
*/
@Test
public void testHAQueuedCqStatForGIINoFailover() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start a durable client that is kept alive on the server when it stops
// normally
durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on both servers
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// verify durable cqs on both servers
checkNumDurableCqs(server1VM, durableClientId, 3);
checkNumDurableCqs(server2VM, durableClientId, 3);
// shutdown server 2
closeCache(server2VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// Re-start server2, should get events through gii
this.server2VM.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE,
server2Port));
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// verify cq stats are correct on server 2
checkNumDurableCqs(server2VM, durableClientId, 3);
checkCqStatOnServer(server2VM, durableClientId, "All", 10);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for server2 (we failed over)
flushEntries(server2VM, durableClientVM, regionName);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "All", 0);
// Stop the durable clients
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the servers
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* server 1 and 2 both get events server 1 goes down dc reconnects server 2 behaves accordingly
*/
@Test
public void testHAQueuedCqStatForFailover() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start server 2 using the same mcast port as server 1
final int server2Port = this.server2VM
.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
// Start a durable client that is kept alive on the server when it stops
// normally
final String durableClientId = getName() + "_client";
this.durableClientVM.invoke(CacheServerTestUtil::disableShufflingOfEndpoints);
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on both servers
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server2VM);
verifyDurableClientPresent(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, durableClientId,
server1VM);
// verify durable cqs on both servers
checkNumDurableCqs(server1VM, durableClientId, 3);
checkNumDurableCqs(server2VM, durableClientId, 3);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
closeCache(server1VM);
// verify cq stats are correct on server 2
checkNumDurableCqs(server2VM, durableClientId, 3);
checkCqStatOnServer(server2VM, durableClientId, "All", 10);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, server2Port, regionName);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
// verify listeners on client
checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 5 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 10 /* numEventsExpected */,
/* numEventsToWaitFor */ 15/* secondsToWait */);
// Verify stats are 0 for both servers
flushEntries(server2VM, durableClientVM, regionName);
checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server2VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server2VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
/**
* Tests the ha queued cq stat
*/
@Test
public void testHAQueuedCqStatWithTimeOut() {
String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
String allQuery = "select * from /" + regionName + " p where p.ID > -1";
String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
int timeoutInSeconds = 20;
// Start server 1
server1Port = this.server1VM.invoke(
() -> CacheServerTestUtil.createCacheServer(regionName, Boolean.TRUE));
final String durableClientId = getName() + "_client";
startDurableClient(durableClientVM, durableClientId, server1Port, regionName, timeoutInSeconds);
// register durable cqs
createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
createCq(durableClientVM, "All", allQuery, true);
createCq(durableClientVM, "LessThan5", lessThan5Query, true);
// send client ready
sendClientReady(durableClientVM);
// Verify durable client on server
verifyDurableClientPresent(timeoutInSeconds, durableClientId,
server1VM);
// Stop the durable client
this.disconnectDurableClient(true);
// Start normal publisher client
startClient(publisherClientVM, server1Port, regionName);
// Publish some entries
publishEntries(regionName, 10);
// verify cq stats are correct
checkNumDurableCqs(server1VM, durableClientId, 3);
checkCqStatOnServer(server1VM, durableClientId, "All", 10);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
// Pause for timeout
try {
Thread.sleep((timeoutInSeconds + 5) * 1000);
} catch (InterruptedException e) {
fail("interrupted", e);
}
// Restart the durable client
startDurableClient(durableClientVM, durableClientId, server1Port, regionName);
// Reregister durable cqs
createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5",
true);
createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5",
true);
// send client ready
sendClientReady(durableClientVM);
// Make sure all events are expired and are not sent
checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "LessThan5", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
checkCqListenerEvents(durableClientVM, "All", 0 /* numEventsExpected */,
/* numEventsToWaitFor */ 5/* secondsToWait */);
// Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
// This can cause events to linger in the queue due to a "later" ack and only cleared on
// the next dispatch. We need to send one more message to dispatch, that calls remove one more
// time and any remaining acks (with or without this final published events ack)
flushEntries(server1VM, durableClientVM, regionName);
checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
checkCqStatOnServer(server1VM, durableClientId, "All", 0);
// Stop the durable client
this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the publisher client
this.publisherClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
// Stop the server
this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache);
}
}