blob: dfda8c76c360caf58d55f31a9599ccbfb53240b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.dunit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.query.cq.internal.CqServiceImpl;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.cq.CqServiceProvider;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Test class for testing {@link CqServiceImpl#EXECUTE_QUERY_DURING_INIT} flag
*
*/
@Category({ClientSubscriptionTest.class})
public class CqQueryOptimizedExecuteDUnitTest extends CqQueryDUnitTest {
public CqQueryOptimizedExecuteDUnitTest() {
super();
}
@Override
protected final void postSetUpCqQueryDUnitTest() throws Exception {
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
}
});
}
@Override
public final void preTearDownCacheTestCase() throws Exception {
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
CqServiceProvider.MAINTAIN_KEYS = true;
}
});
}
@Test
public void testCqExecuteWithoutQueryExecution() throws Exception {
final Host host = Host.getHost(0);
final VM server = host.getVM(0);
final VM client = host.getVM(1);
final int numOfEntries = 10;
final String cqName = "testCqExecuteWithoutQueryExecution_1";
createServer(server);
// Create values.
createValues(server, regions[0], numOfEntries);
final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
// Create client.
createClient(client, thePort, host0);
/* Create CQs. */
createCQ(client, cqName, cqs[0]);
validateCQCount(client, 1);
executeCQ(client, cqName, false, null);
server.invoke(new CacheSerializableRunnable("execute cq") {
@Override
public void run2() throws CacheException {
assertFalse("CqServiceImpl.EXECUTE_QUERY_DURING_INIT flag should be false ",
CqServiceImpl.EXECUTE_QUERY_DURING_INIT);
long numOfQueryExecutions = (Long) ((GemFireCacheImpl) getCache()).getCachePerfStats()
.getStats().get("queryExecutions");
assertEquals("Number of query executions for cq.execute should be 0 ", 0,
numOfQueryExecutions);
}
});
// Create more values.
server.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = numOfEntries + 1; i <= numOfEntries * 2; i++) {
region1.put(KEY + i, new Portfolio(i));
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries in Region :" + region1.keySet().size());
}
});
waitForCreated(client, cqName, KEY + numOfEntries * 2);
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ numOfEntries, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ numOfEntries);
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
waitForUpdated(client, cqName, KEY + numOfEntries);
// validate Update events.
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ numOfEntries,
/* queryUpdates: */ 15, /* queryDeletes: */ 0, /* totalEvents: */ numOfEntries + 15);
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, cqName, KEY + 5);
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ numOfEntries,
/* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ numOfEntries + 15 + 5);
closeClient(client);
closeServer(server);
}
@Test
public void testCqExecuteWithoutQueryExecutionAndNoRSCaching() throws Exception {
final Host host = Host.getHost(0);
final VM server = host.getVM(0);
final VM client = host.getVM(1);
final int numOfEntries = 10;
final String cqName = "testCqExecuteWithoutQueryExecution_1";
server.invoke(new CacheSerializableRunnable("execute cq") {
@Override
public void run2() throws CacheException {
CqServiceProvider.MAINTAIN_KEYS = false;
}
});
createServer(server);
// Create values.
createValues(server, regions[0], numOfEntries);
final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server.getHost());
// Create client.
createClient(client, thePort, host0);
/* Create CQs. */
createCQ(client, cqName, cqs[0]);
validateCQCount(client, 1);
executeCQ(client, cqName, false, null);
server.invoke(new CacheSerializableRunnable("execute cq") {
@Override
public void run2() throws CacheException {
assertFalse("CqServiceImpl.EXECUTE_QUERY_DURING_INIT flag should be false ",
CqServiceImpl.EXECUTE_QUERY_DURING_INIT);
assertFalse(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS flag should be false ",
CqServiceProvider.MAINTAIN_KEYS);
long numOfQueryExecutions = (Long) ((GemFireCacheImpl) getCache()).getCachePerfStats()
.getStats().get("queryExecutions");
assertEquals("Number of query executions for cq.execute should be 0 ", 0,
numOfQueryExecutions);
}
});
// Create more values.
server.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regions[0]);
for (int i = numOfEntries + 1; i <= numOfEntries * 2; i++) {
region1.put(KEY + i, new Portfolio(i));
}
LogWriterUtils.getLogWriter()
.info("### Number of Entries in Region :" + region1.keySet().size());
}
});
waitForCreated(client, cqName, KEY + numOfEntries * 2);
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ numOfEntries, /* queryUpdates: */ 0,
/* queryDeletes: */ 0, /* totalEvents: */ numOfEntries);
// Update values.
createValues(server, regions[0], 5);
createValues(server, regions[0], 10);
waitForUpdated(client, cqName, KEY + numOfEntries);
// validate Update events.
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ numOfEntries,
/* queryUpdates: */ 15, /* queryDeletes: */ 0, /* totalEvents: */ numOfEntries + 15);
// Validate delete events.
deleteValues(server, regions[0], 5);
waitForDestroyed(client, cqName, KEY + 5);
validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ numOfEntries,
/* updates: */ 15, /* deletes; */5, /* queryInserts: */ numOfEntries,
/* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ numOfEntries + 15 + 5);
closeClient(client);
closeServer(server);
}
// Moved from CqQueryDUnitTest class.
// Tests when two pools each are configured to a different server
// Each cq uses a different pool and the servers are shutdown.
// The listeners for each cq should receive a connect and disconnect
// when their respective servers are shutdown
@Override
@Test
public void testCQAllServersLeaveMultiplePool() throws Exception {
final Host host = Host.getHost(0);
VM server1 = host.getVM(1);
VM server2 = host.getVM(2);
VM client = host.getVM(3);
createServer(server1);
final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
createServer(server2, ports[0]);
final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
Wait.pause(8 * 1000);
// Create client
createClientWith2Pools(client, new int[] {port1}, new int[] {thePort2}, host0, "-1");
// Create CQs.
createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
executeCQ(client, "testCQAllServersLeave_" + 11, false, null);
createCQ(client, "testCQAllServersLeave_" + 12, cqs[12], true);
executeCQ(client, "testCQAllServersLeave_" + 12, false, null);
Wait.pause(5 * 1000);
waitForCqsConnected(client, "testCQAllServersLeave_11", 1);
waitForCqsConnected(client, "testCQAllServersLeave_12", 1);
// CREATE.
createValues(server2, regions[0], 10);
createValues(server2, regions[1], 10);
waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);
// Close server1 pause is unnecessary here since each cq has it's own pool
// and each pool is configured only to 1 server.
closeServer(server1);
waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);
createValues(server2, regions[1], 20);
waitForCreated(client, "testCQAllServersLeave_12", KEY + 19);
// Close server 2 pause unnecessary here since each cq has it's own pool
closeServer(server2);
waitForCqsDisconnected(client, "testCQAllServersLeave_12", 1);
// Close.
closeClient(client);
}
}