blob: 4d7cd48c02a922a60c270a0909cd99258a8476c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.dunit;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.cq.internal.CqServiceImpl;
import org.apache.geode.cache.query.cq.internal.ServerCQImpl;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
* data activities.
*/
@SuppressWarnings("SpellCheckingInspection")
@Category({ClientSubscriptionTest.class})
public class CqPerfDUnitTest extends JUnit4CacheTestCase {
private final Logger logger = LogService.getLogger();
@SuppressWarnings("CanBeFinal")
private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
public CqPerfDUnitTest() {
super();
}
@Override
public final void postSetUp() {
// avoid IllegalStateException from HandShake by connecting all vms tor
// system before creating connection pools
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
}
/**
* Tests the cq performance.
*/
@Ignore("perf")
@Test
public void testCQPerf() {
VM server = VM.getVM(0);
VM client = VM.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// Create client.
cqDUnitTest.createClient(client, port, host0);
final String cqName = "testCQPerf_0";
client.invoke(() -> {
logger.info("### Create CQ. ###" + cqName);
// Get CQ Service.
QueryService cqService =
getCache().getQueryService();
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqTimeTestListener(LogWriterUtils.getLogWriter())};
((CqTimeTestListener) cqListeners[0]).cqName = cqName;
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create and Execute CQ.
CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
cq1.execute();
});
final int size = 50;
// Create values.
cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
Wait.pause(5000);
// Update values
cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
client.invoke(new CacheSerializableRunnable("Validate CQs") {
@Override
public void run2() throws CacheException {
logger.info("### Validating CQ. ### " + cqName);
// Get CQ Service.
QueryService cqService = getCache().getQueryService();
CqQuery cQuery = cqService.getCq(cqName);
assertThat(cQuery).isNotNull();
}
});
Wait.pause(10 * 60 * 1000);
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Test for maintaining keys for update optimization.
*/
@Test
public void testKeyMaintenance() {
VM server = VM.getVM(0);
VM client = VM.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
cqDUnitTest.createClient(client, port, host0);
// Cq1
cqDUnitTest.createCQ(client, "testKeyMaintenance_0", cqDUnitTest.cqs[0]);
cqDUnitTest.executeCQ(client, "testKeyMaintenance_0", false, null);
// Cq2
cqDUnitTest.createCQ(client, "testKeyMaintenance_1", cqDUnitTest.cqs[10]);
cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 1);
cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 1);
// Entry is made into the CQs cache hashSet.
// testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 0
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(1);
} else if (serverCqName.startsWith("testKeyMaintenance_1")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
.isEqualTo(0);
}
}
});
// Update 1.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 10);
// Entry/check is made into the CQs cache hashSet.
// testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(10);
} else if (serverCqName.startsWith("testKeyMaintenance_1")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
.isEqualTo(5);
}
}
});
// Update.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 12);
// Entry/check is made into the CQs cache hashSet.
// testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(12);
} else if (serverCqName.startsWith("testKeyMaintenance_1")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
.isEqualTo(6);
}
}
});
// Delete.
cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 6);
cqDUnitTest.waitForDestroyed(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 6);
// Entry/check is made into the CQs cache hashSet.
// testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(6);
} else if (serverCqName.startsWith("testKeyMaintenance_1")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
.isEqualTo(3);
}
}
});
// Stop CQ.
// This should still needs to process the events so that Results are up-to-date.
cqDUnitTest.stopCQ(client, "testKeyMaintenance_1");
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(12);
} else if (serverCqName.startsWith("testKeyMaintenance_1")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
.isEqualTo(6);
}
}
});
// re-start the CQ.
cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);
// This will remove the caching for this CQ.
cqDUnitTest.closeCQ(client, "testKeyMaintenance_1");
server.invoke(() -> {
CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cqQuery = (ServerCQImpl) cq;
String serverCqName = cqQuery.getServerCqName();
assertThat(serverCqName.startsWith("testKeyMaintenance_1")).isFalse();
if (serverCqName.startsWith("testKeyMaintenance_0")) {
assertThat(cqQuery.getCqResultKeysSize())
.describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
.isEqualTo(12);
}
}
});
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Test for common CQs. To test the changes relating to, executing CQ only once for all similar
* CQs.
*/
@Test
public void testMatchingCqs() {
VM server = VM.getVM(0);
VM client = VM.getVM(1);
cqDUnitTest.createServer(server);
final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
cqDUnitTest.createClient(client, port, host0);
// Create and Execute same kind of CQs.
for (int i = 0; i < 4; i++) {
cqDUnitTest.createCQ(client, "testMatchingCqs_" + i, cqDUnitTest.cqs[0]);
cqDUnitTest.executeCQ(client, "testMatchingCqs_" + i, false, null);
}
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 4);
int size = 1;
// Create.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
cqDUnitTest.waitForCreated(client, "testMatchingCqs_0", CqQueryDUnitTest.KEY + size);
cqDUnitTest.waitForCreated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);
// Close one of the CQ.
cqDUnitTest.closeCQ(client, "testMatchingCqs_0");
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 3);
// Update.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);
// Stop one of the CQ.
cqDUnitTest.stopCQ(client, "testMatchingCqs_1");
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 2);
// Update - 2.
cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);
// stopped CQ should not receive 2nd/previous updates.
cqDUnitTest.validateCQ(client, "testMatchingCqs_1", /* resultSize: */ CqQueryDUnitTest.noTest,
/* creates: */ size, /* updates: once */ size, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size * 2);
// Execute the stopped CQ.
cqDUnitTest.executeCQ(client, "testMatchingCqs_1", false, null);
// Update - 3.
cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);
cqDUnitTest.validateCQ(client, "testMatchingCqs_1", /* resultSize: */ CqQueryDUnitTest.noTest,
/* creates: */ size, /* updates: 2 */ size * 2, /* deletes; */ 0, /* queryInserts: */ size,
/* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3);
// Create different kind of CQs.
cqDUnitTest.createCQ(client, "testMatchingCqs_4", cqDUnitTest.cqs[1]);
cqDUnitTest.executeCQ(client, "testMatchingCqs_4", false, null);
cqDUnitTest.createCQ(client, "testMatchingCqs_5", cqDUnitTest.cqs[1]);
cqDUnitTest.executeCQ(client, "testMatchingCqs_5", false, null);
cqDUnitTest.createCQ(client, "testMatchingCqs_6", cqDUnitTest.cqs[2]);
cqDUnitTest.executeCQ(client, "testMatchingCqs_6", false, null);
validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2);
cqDUnitTest.closeCQ(client, "testMatchingCqs_6");
validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2);
cqDUnitTest.closeCQ(client, "testMatchingCqs_5");
cqDUnitTest.closeCQ(client, "testMatchingCqs_4");
cqDUnitTest.closeCQ(client, "testMatchingCqs_3");
cqDUnitTest.closeCQ(client, "testMatchingCqs_2");
cqDUnitTest.closeCQ(client, "testMatchingCqs_1");
validateMatchingCqs(server, 0, null, 0);
// update 4
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
// Close.
cqDUnitTest.closeClient(client);
cqDUnitTest.closeServer(server);
}
/**
* Test for common CQs. To test the changes relating to, executing CQ only once for all similar
* CQs.
*/
@Test
public void testMatchingCQWithMultipleClients() {
VM server = VM.getVM(0);
VM client1 = VM.getVM(1);
VM client2 = VM.getVM(2);
VM client3 = VM.getVM(3);
VM clients[] = new VM[] {client1, client2, client3};
cqDUnitTest.createServer(server);
final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.createClient(clients[clientIndex], port, host0);
// Create and Execute same kind of CQs.
for (int i = 0; i < 4; i++) {
cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_" + i,
cqDUnitTest.cqs[0]);
cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_" + i, false,
null);
}
}
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], clients.length * 4);
int size = 1;
// Create.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.waitForCreated(clients[clientIndex], "testMatchingCQWithMultipleClients_0",
CqQueryDUnitTest.KEY + size);
cqDUnitTest.waitForCreated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
CqQueryDUnitTest.KEY + size);
}
// Close one of the CQ.
cqDUnitTest.closeCQ(client1, "testMatchingCQWithMultipleClients_0");
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
// Update.
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
CqQueryDUnitTest.KEY + size);
}
// Stop one of the CQ.
cqDUnitTest.stopCQ(client2, "testMatchingCQWithMultipleClients_1");
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 2);
// Update - 2.
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.clearCQListenerEvents(clients[clientIndex],
"testMatchingCQWithMultipleClients_3");
}
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
CqQueryDUnitTest.KEY + size);
}
// stopped CQ should not receive 2nd/previous updates.
cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
/* resultSize: */ CqQueryDUnitTest.noTest, /* creates: */ size, /* updates: once */ size,
/* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ size, /* queryDeletes: */ 0,
/* totalEvents: */ size * 2);
// Execute the stopped CQ.
cqDUnitTest.executeCQ(client2, "testMatchingCQWithMultipleClients_1", false, null);
// Update - 3.
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.clearCQListenerEvents(clients[clientIndex],
"testMatchingCQWithMultipleClients_3");
}
validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
CqQueryDUnitTest.KEY + size);
}
cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
/* resultSize: */ CqQueryDUnitTest.noTest, /* creates: */ size, /* updates: 2 */ size * 2,
/* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ size * 2,
/* queryDeletes: */ 0, /* totalEvents: */ size * 3);
// Create different kind of CQs.
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4",
cqDUnitTest.cqs[1]);
cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4", false,
null);
cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5",
cqDUnitTest.cqs[1]);
cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5", false,
null);
cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6",
cqDUnitTest.cqs[2]);
cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6", false,
null);
}
validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2 * clients.length);
for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6");
}
validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * clients.length);
// update 4
cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
// Close.
cqDUnitTest.closeClient(client3);
validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * (clients.length - 1));
for (int clientIndex = 0; clientIndex < 2; clientIndex++) {
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5");
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4");
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_2");
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_1");
if (clientIndex != 0) {
cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_0");
}
}
validateMatchingCqs(server, 0, null, 0);
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeClient(client1);
cqDUnitTest.closeServer(server);
}
@Test
public void testMatchingCQsWithMultipleServers() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
VM client1 = VM.getVM(2);
VM client2 = VM.getVM(3);
cqDUnitTest.createServer(server1);
VM clients[] = new VM[] {client1, client2};
final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// Create client.
// Create client with redundancyLevel -1
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");
int numCQs = 3;
for (int i = 0; i < numCQs; i++) {
cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);
cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
}
validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);
Wait.pause(1000);
// CREATE.
cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
for (int i = 1; i <= 10; i++) {
cqDUnitTest.waitForCreated(client1, "testMatchingCQsWithMultipleServers_0",
CqQueryDUnitTest.KEY + i);
}
cqDUnitTest.createServer(server2, ports[0]);
final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
System.out
.println("### Port on which server1 running : " + port1 + " Server2 running : " + port2);
// UPDATE - 1.
for (int k = 0; k < numCQs; k++) {
cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
}
cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
// Wait for updates on regions[0]
for (int i = 1; i <= 10; i++) {
cqDUnitTest.waitForUpdated(client1, "testMatchingCQsWithMultipleServers_0",
CqQueryDUnitTest.KEY + i);
cqDUnitTest.waitForUpdated(client2, "testMatchingCQsWithMultipleServers_0",
CqQueryDUnitTest.KEY + i);
}
// Wait for updates on regions[1] - Waiting for last key is good enough.
cqDUnitTest.waitForUpdated(client1, "testMatchingCQsWithMultipleServers_2",
CqQueryDUnitTest.KEY + 4);
cqDUnitTest.waitForUpdated(client2, "testMatchingCQsWithMultipleServers_2",
CqQueryDUnitTest.KEY + 4);
int[] resultsCnt = new int[] {10, 1, 2};
for (int i = 0; i < numCQs; i++) {
cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i,
CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i,
CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
}
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeServer(server2);
}
@Test
public void testFailOverMatchingCQsWithMultipleServers() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
VM client1 = VM.getVM(2);
VM client2 = VM.getVM(3);
logger.info("Ready to create server 1");
cqDUnitTest.createServer(server1);
logger.info("Ready to create server 1");
VM clients[] = new VM[] {client1, client2};
final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// Create client.
// Create client with redundancyLevel -1
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");
int numCQs = 3;
for (int i = 0; i < numCQs; i++) {
cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);
cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
}
validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);
cqDUnitTest.createServer(server2, ports[0]);
// Close server1.
cqDUnitTest.closeServer(server1);
validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], clients.length);
// Close.
cqDUnitTest.closeClient(client1);
validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], (clients.length - 1));
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeServer(server2);
}
/**
* Test for CQ Fail over.
*/
@Test
public void testMatchingCQsOnDataNodeWithMultipleServers() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
VM client1 = VM.getVM(2);
VM client2 = VM.getVM(3);
cqDUnitTest.createServerOnly(server1, 0);
cqDUnitTest.createServerOnly(server2, 0);
cqDUnitTest.createPartitionRegion(server1, cqDUnitTest.regions);
cqDUnitTest.createPartitionRegion(server2, cqDUnitTest.regions);
VM clients[] = new VM[] {client1, client2};
final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
cqDUnitTest.createLocalRegion(client1, new int[] {port1, ports[0]}, host0, "-1",
cqDUnitTest.regions);
cqDUnitTest.createLocalRegion(client2, new int[] {port1, ports[0]}, host0, "-1",
cqDUnitTest.regions);
int numCQs = cqDUnitTest.prCqs.length;
for (int i = 0; i < numCQs; i++) {
cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i,
cqDUnitTest.prCqs[i]);
cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);
cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i,
cqDUnitTest.prCqs[i]);
cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
}
validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[0], clients.length);
validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[1], clients.length);
validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[0], clients.length);
validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[1], clients.length);
// Close.
cqDUnitTest.closeClient(client1);
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeServer(server1);
cqDUnitTest.closeServer(server2);
}
/**
* Performance test for Matching CQ optimization changes.
*/
@Ignore("perf")
@Test
public void testPerformanceForMatchingCQs() {
VM server1 = VM.getVM(0);
VM server2 = VM.getVM(1);
VM client1 = VM.getVM(2);
VM client2 = VM.getVM(3);
cqDUnitTest.createServer(server1);
cqDUnitTest.createServer(server2);
// VM clients[] = new VM[]{client1, client2};
final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// Create client.
// final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
// Client1 connects to server1.
cqDUnitTest.createClient(client1, new int[] {port1}, host0, "-1");
// Client2 connects to server2.
cqDUnitTest.createClient(client2, new int[] {port2}, host0, "-1");
// Client1 registers matching CQs on server1.
String[] matchingCqs = this.generateCqQueries(false);
for (int i = 0; i < matchingCqs.length; i++) {
cqDUnitTest.createCQ(client1, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
cqDUnitTest.executeCQ(client1, "testPerformanceForMatchingCQs_" + i, false, null);
}
// Client2 registers non-matching CQs on server2.
matchingCqs = this.generateCqQueries(true);
for (int i = 0; i < matchingCqs.length; i++) {
cqDUnitTest.createCQ(client2, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
cqDUnitTest.executeCQ(client2, "testPerformanceForMatchingCQs_" + i, false, null);
}
Wait.pause(1000);
// CREATE.
int size = 1000;
cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size);
// Update couple of times;
for (int j = 0; j < 5; j++) {
cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size - 1);
cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size - 1);
}
for (int j = 0; j < 4; j++) {
cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size - 1);
cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size - 1);
}
// Update the last key.
cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);
for (int k = 1; k <= size; k++) {
cqDUnitTest.waitForUpdated(client1, "testPerformanceForMatchingCQs_0",
CqQueryDUnitTest.KEY + k);
}
Wait.pause(1000);
printCqQueryExecutionTime(server1);
printCqQueryExecutionTime(server2);
// Close.
cqDUnitTest.closeClient(client1);
cqDUnitTest.closeClient(client2);
cqDUnitTest.closeServer(server2);
cqDUnitTest.closeServer(server1);
}
private void validateMatchingCqs(VM server, final int mapSize, final String query,
final int numCqSize) {
server.invoke(() -> {
CqServiceImpl cqService =
(CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
Map matchedCqMap = cqService.getMatchingCqMap();
await()
.until(matchedCqMap::size, equalTo(mapSize));
if (query != null) {
assertThat(matchedCqMap.containsKey(query)).isTrue();
Collection cqs = (Collection) matchedCqMap.get(query);
await()
.until(cqs::size, equalTo(numCqSize));
}
});
}
private void printCqQueryExecutionTime(VM server) {
server.invoke(() -> {
CqServiceImpl cqService =
(CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
logger.info("Total Time taken to Execute CQ Query :" + timeTaken);
});
}
private String[] generateCqQueries(boolean uniqueQueries) {
List<String> initQueries = new ArrayList<>();
// From Portfolio object.
String[] names = {"aaa", "bbb", "ccc", "ddd"};
int nameIndex;
// Construct few unique Queries.
for (int i = 0; i < 3; i++) {
for (int cnt = 0; cnt < 5; cnt++) {
String query = cqDUnitTest.cqs[i];
if (cnt > 0) {
nameIndex = (cnt % names.length);
query += " or p.names[" + nameIndex + "] = '" + names[nameIndex] + cnt + "'";
}
initQueries.add(query);
}
}
int numMatchedQueries = 10;
List<String> cqQueries = new ArrayList<>();
for (String query : initQueries) {
for (int cnt = 0; cnt < numMatchedQueries; cnt++) {
if (uniqueQueries) {
// Append blank string, so that query string is different but the
// Query constraint remains same.
query += " ";
}
cqQueries.add(query);
}
}
String[] queries = new String[cqQueries.size()];
cqQueries.toArray(queries);
return queries;
}
}