blob: 6840c6c56951a2332bdf00cd047a6171ace4ec3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.apache.geode.test.dunit.VM.getHostName;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.ClientCacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.RegionsTest;
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
/**
* Functional distributed tests for Register Interest with KEYS_VALUES policy.
*
* <p>
* TRAC #43684: Register interest with policy KEYS_VALUES is inefficient
*/
@Category(RegionsTest.class)
@SuppressWarnings("serial")
public class RegisterInterestKeysValuesDistributedTest implements Serializable {
private VM server1;
private VM server2;
private VM server3;
private VM client1;
private String regionName;
private String hostName;
private int bucketCount;
private int regexCount;
private int port1;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@Rule
public CacheRule cacheRule = new CacheRule();
@Rule
public ClientCacheRule clientCacheRule = new ClientCacheRule();
@Rule
public SerializableTestName testName = new SerializableTestName();
@Before
public void setUp() throws Exception {
server1 = getVM(0);
server2 = getVM(1);
server3 = getVM(2);
client1 = getVM(3);
regionName = getClass().getSimpleName() + "_" + testName.getMethodName();
hostName = getHostName();
bucketCount = 11;
regexCount = 20;
addIgnoredException("Connection refused: connect");
}
@Test
public void testRIWithSingleKeyOnRR() {
port1 = server1.invoke(() -> createServerCache(true, false));
server2.invoke(() -> createServerCache(true, false));
server3.invoke(() -> createServerCache(true, false));
doRegisterInterest("KEY_1", null, bucketCount);
}
@Test
public void testRIWithAllKeysOnRR() {
port1 = server1.invoke(() -> createServerCache(true, false));
server2.invoke(() -> createServerCache(true, false));
server3.invoke(() -> createServerCache(true, false));
doRegisterInterest(null, null, bucketCount);
}
@Test
public void testRIWithKeyListOnRR() {
port1 = server1.invoke(() -> createServerCache(true, false));
server2.invoke(() -> createServerCache(true, false));
server3.invoke(() -> createServerCache(true, false));
List<String> riKeys = new ArrayList<>();
riKeys.add("KEY_0");
riKeys.add("KEY_1");
riKeys.add("KEY_2");
riKeys.add("KEY_5");
riKeys.add("KEY_6");
riKeys.add("KEY_7");
riKeys.add("KEY_9");
doRegisterInterest(riKeys, null, bucketCount);
}
@Test
public void testRIWithRegularExpressionOnRR() {
port1 = server1.invoke(() -> createServerCache(true, false));
server2.invoke(() -> createServerCache(true, false));
server3.invoke(() -> createServerCache(true, false));
doRegisterInterest(null, "^[X][_].*", bucketCount);
}
@Test
public void testRIWithSingleKeyOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest("KEY_1", null, bucketCount);
}
@Test
public void testRIWithAllKeysOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest(null, null, bucketCount);
}
@Test
public void testRIWithKeyListOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
List<String> riKeys = new ArrayList<>();
riKeys.add("KEY_0");
riKeys.add("KEY_1");
riKeys.add("KEY_2");
riKeys.add("KEY_5");
riKeys.add("KEY_6");
riKeys.add("KEY_7");
riKeys.add("KEY_9");
doRegisterInterest(riKeys, null, bucketCount);
}
@Test
public void testRIWithRegularExpressionOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest(null, "^[X][_].*", bucketCount);
}
@Test
public void testRIWithMoreEntriesOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest(null, null, 5147);
}
@Test
public void testRIWithSingleKeyOnEmptyPrimaryOnPR() {
port1 = server1.invoke(() -> createServerCache(false, true));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest("KEY_1", null, bucketCount);
}
@Test
public void testRIWithAllKeysOnEmptyPrimaryOnPR() {
port1 = server1.invoke(() -> createServerCache(false, true));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest(null, null, bucketCount);
}
@Test
public void testRIWithKeyListOnEmptyPrimaryOnPR() {
port1 = server1.invoke(() -> createServerCache(false, true));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
List<String> riKeys = new ArrayList<>();
riKeys.add("KEY_0");
riKeys.add("KEY_1");
riKeys.add("KEY_2");
riKeys.add("KEY_5");
riKeys.add("KEY_6");
riKeys.add("KEY_7");
riKeys.add("KEY_9");
doRegisterInterest(riKeys, null, bucketCount);
}
@Test
public void testRIWithRegularExpressionOnEmptyPrimaryOnPR() {
port1 = server1.invoke(() -> createServerCache(false, true));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
doRegisterInterest(null, "^[X][_].*", bucketCount);
}
@Test
public void testNativeClientIssueOnPR() {
port1 = server1.invoke(() -> createServerCache(false, false));
server2.invoke(() -> createServerCache(false, false));
server3.invoke(() -> createServerCache(false, false));
List<String> keys = new ArrayList<>();
keys.add("OPKEY_0");
keys.add("OPKEY_1");
keys.add("OPKEY_2");
keys.add("OPKEY_3");
keys.add("OPKEY_4");
keys.add("OPKEY_5");
keys.add("OPKEY_6");
keys.add("OPKEY_7");
keys.add("OPKEY_8");
keys.add("OPKEY_9");
keys.add("OPKEY_10");
keys.add("OPKEY_11");
keys.add("OPKEY_12");
keys.add("OPKEY_13");
client1.invoke(() -> createClientCache(port1));
createClientCache(port1);
doOps();
client1.invoke(() -> {
Region<Object, Object> region = clientCache().getRegion(regionName);
region.registerInterest(keys);
region.registerInterest("UNKNOWN_KEY");
});
client1.invoke(() -> {
InternalRegion region = (InternalRegion) clientCache().getRegion(regionName);
for (int i = 0; i < 7; i++) {
assertTrue(region.containsKey("OPKEY_" + i));
}
for (int i = 7; i < 14; i++) {
assertFalse(region.containsKey("OPKEY_" + i));
assertTrue(region.containsTombstone("OPKEY_" + i));
}
});
}
private void doRegisterInterest(Object keys, String regEx, int numOfPuts) {
server1.invoke(() -> doPuts(numOfPuts, regEx, regexCount));
client1.invoke(() -> createClientCache(port1));
client1.invoke(() -> registerInterest(keys, regEx));
server1.invoke(() -> cache().close());
// int size = getExpectedSize(keys, regEx, numOfPuts);
client1.invoke(() -> verifyResponse(getExpectedSize(keys, regEx, numOfPuts)));
}
private int getExpectedSize(Object keys, String regEx, int numOfPuts) {
return keys != null ? keys instanceof List ? ((List) keys).size() : 1
: regEx == null ? numOfPuts : regexCount;
}
private int createServerCache(boolean isReplicated, boolean isPrimaryEmpty) throws IOException {
createCache();
RegionFactory rf;
if (isReplicated) {
RegionShortcut rs =
isPrimaryEmpty ? RegionShortcut.REPLICATE_PROXY : RegionShortcut.REPLICATE;
rf = cache().createRegionFactory(rs);
} else {
RegionShortcut rs =
isPrimaryEmpty ? RegionShortcut.PARTITION_PROXY : RegionShortcut.PARTITION;
rf = cache().createRegionFactory(rs);
rf.setPartitionAttributes(
new PartitionAttributesFactory().setTotalNumBuckets(bucketCount).create());
}
rf.create(regionName);
CacheServer server = cache().addCacheServer();
server.setPort(0);
server.start();
return server.getPort();
}
private void createClientCache(int port) {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer(hostName, port).setPoolSubscriptionEnabled(true);
createClientCache(ccf);
ClientRegionFactory crf =
clientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
crf.create(regionName);
}
private void registerInterest(Object keys, String regEx) {
Region<Object, Object> region = clientCache().getRegion(regionName);
if (keys == null && regEx == null) {
region.registerInterest("ALL_KEYS");
} else if (keys != null) {
region.registerInterest(keys);
region.registerInterest("UNKNOWN_KEY");
} else {
region.registerInterestRegex(regEx);
}
}
private void doPuts(int count, String regex, int regexNum) {
Region<String, String> region = cache().getRegion(regionName);
for (int i = 0; i < count; i++) {
region.create("KEY_" + i, "VALUE__" + i);
}
if (regex != null) {
for (int i = 0; i < regexNum; i++) {
region.create("X_KEY_" + i, "X_VALUE__" + i);
}
}
}
private void doOps() {
Region<String, String> region = clientCache().getRegion(regionName);
for (int i = 0; i < 14; i++) {
region.create("OPKEY_" + i, "OPVALUE__" + i);
}
for (int i = 7; i < 14; i++) {
region.destroy("OPKEY_" + i);
}
}
private void verifyResponse(int size) {
Region region = clientCache().getRegion(regionName);
assertEquals(size, region.size());
}
private InternalCache cache() {
return cacheRule.getCache();
}
private InternalClientCache clientCache() {
return clientCacheRule.getClientCache();
}
private void createCache() {
cacheRule.createCache();
}
private void createClientCache(ClientCacheFactory clientCacheFactory) {
clientCacheRule.createClientCache(clientCacheFactory);
}
}