| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.wan; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.DiskStore; |
| import org.apache.geode.cache.EvictionAction; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.server.ClientSubscriptionConfig; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalCacheServer; |
| import org.apache.geode.internal.cache.ha.HAContainerRegion; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.junit.categories.WanTest; |
| |
| @Category({WanTest.class}) |
| public class CacheClientNotifierDUnitTest extends WANTestBase { |
| |
| private static final int NUM_KEYS = 10; |
| |
| private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity, |
| final String policy, final String diskStoreName) { |
| final int serverPort = getRandomAvailableTCPPort(); |
| |
| SerializableRunnable createCacheServer = new SerializableRunnable() { |
| @Override |
| public void run() throws Exception { |
| CacheServerImpl server = (CacheServerImpl) cache.addCacheServer(); |
| server.setPort(serverPort); |
| if (withCSC) { |
| if (diskStoreName != null) { |
| DiskStore ds = cache.findDiskStore(diskStoreName); |
| if (ds == null) { |
| ds = cache.createDiskStoreFactory().create(diskStoreName); |
| } |
| } |
| ClientSubscriptionConfig csc = server.getClientSubscriptionConfig(); |
| csc.setCapacity(capacity); |
| csc.setEvictionPolicy(policy); |
| csc.setDiskStoreName(diskStoreName); |
| server.setHostnameForClients("localhost"); |
| // server.setGroups(new String[]{"serv"}); |
| } |
| try { |
| server.start(); |
| } catch (IOException e) { |
| org.apache.geode.test.dunit.Assert.fail("Failed to start server ", e); |
| } |
| } |
| }; |
| vm.invoke(createCacheServer); |
| return serverPort; |
| } |
| |
| private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, |
| final int capacity) { |
| SerializableRunnable checkCacheServer = new SerializableRunnable() { |
| |
| @Override |
| public void run() throws Exception { |
| List<InternalCacheServer> cacheServers = |
| ((InternalCache) cache).getCacheServersAndGatewayReceiver(); |
| CacheServerImpl server = null; |
| for (CacheServer cs : cacheServers) { |
| if (cs.getPort() == serverPort) { |
| server = (CacheServerImpl) cs; |
| break; |
| } |
| } |
| assertNotNull(server); |
| CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier(); |
| HAContainerRegion haContainer = (HAContainerRegion) ccn.getHaContainer(); |
| if (server.getAcceptor().isGatewayReceiver()) { |
| assertNull(haContainer); |
| return; |
| } |
| Region internalRegion = haContainer.getMapForTest(); |
| RegionAttributes ra = internalRegion.getAttributes(); |
| EvictionAttributes ea = ra.getEvictionAttributes(); |
| if (withCSC) { |
| assertNotNull(ea); |
| assertEquals(capacity, ea.getMaximum()); |
| assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction()); |
| } else { |
| assertNull(ea); |
| } |
| } |
| }; |
| vm.invoke(checkCacheServer); |
| } |
| |
| public static void closeACacheServer(final int serverPort) { |
| List<CacheServer> cacheServers = cache.getCacheServers(); |
| CacheServerImpl server = null; |
| for (CacheServer cs : cacheServers) { |
| if (cs.getPort() == serverPort) { |
| server = (CacheServerImpl) cs; |
| break; |
| } |
| } |
| assertNotNull(server); |
| server.stop(); |
| } |
| |
| private void verifyRegionSize(VM vm, final int expect) { |
| SerializableRunnable verifyRegionSize = new SerializableRunnable() { |
| @Override |
| public void run() throws Exception { |
| final Region region = cache.getRegion(getTestMethodName() + "_PR"); |
| |
| await().untilAsserted(() -> assertEquals(expect, region.size())); |
| } |
| }; |
| vm.invoke(verifyRegionSize); |
| } |
| |
| /** |
| * The test will start several cache servers, including gateway receivers. Shutdown them and |
| * verify the CacheClientNotifier for each server is correct |
| */ |
| @Test |
| public void testNormalClient2MultipleCacheServer() throws Exception { |
| doMultipleCacheServer(false); |
| } |
| |
| public void doMultipleCacheServer(boolean durable) throws Exception { |
| /* test scenario: */ |
| /* create 1 GatewaySender on vm5 */ |
| /* create 1 GatewayReceiver on vm2 */ |
| /* create 2 cache servers on vm2, one with overflow. */ |
| /* verify if the cache server2 still has the overflow attributes */ |
| /* create 1 cache client1 on vm3 to register interest on cache server1 */ |
| /* create 1 cache client2 on vm4 to register interest on cache server1 */ |
| /* do some puts to GatewaySender on vm5 */ |
| |
| // start locators |
| Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); |
| Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); |
| |
| // create receiver and cache servers will be at ny |
| vm2.invoke(() -> WANTestBase.createCache(nyPort)); |
| int receiverPort = vm2.invoke(() -> WANTestBase.createReceiver()); |
| checkCacheServer(vm2, receiverPort, false, 0); |
| |
| // create PR for receiver |
| vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR", |
| null, 1, 100, isOffHeap())); |
| |
| // create cache server1 with overflow |
| int serverPort = createCacheServerWithCSC(vm2, true, 3, "entry", "DEFAULT"); |
| checkCacheServer(vm2, serverPort, true, 3); |
| |
| /* |
| * GEODE-6344: not to create the 2nd cacheserver on the same jvm until refix of GEODE-1183, |
| * otherwise the test might be hang |
| * // create cache server 2 |
| * final int serverPort2 = createCacheServerWithCSC(vm2, false, 0, null, null); |
| * // Currently, only the first cache server's overflow attributes will take effect |
| * // It will be enhanced in GEODE-1102 |
| * checkCacheServer(vm2, serverPort2, true, 3); |
| * LogService.getLogger().info("receiverPort=" + receiverPort + ",serverPort=" + serverPort |
| * + ",serverPort2=" + serverPort2); |
| */ |
| |
| vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", |
| "123", durable)); |
| vm4.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", |
| "124", durable)); |
| |
| // create sender at ln |
| vm5.invoke(() -> WANTestBase.createCache(lnPort)); |
| vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 400, false, false, null, true)); |
| vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName() + "_PR", |
| "ln", 1, 100, isOffHeap())); |
| vm5.invoke(() -> WANTestBase.startSender("ln")); |
| vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS)); |
| |
| /* verify */ |
| verifyRegionSize(vm5, NUM_KEYS); |
| verifyRegionSize(vm2, NUM_KEYS); |
| verifyRegionSize(vm4, NUM_KEYS); |
| verifyRegionSize(vm3, NUM_KEYS); |
| |
| /* |
| * GEODE-6344 |
| * // close a cache server, then re-test |
| * vm2.invoke(() -> closeACacheServer(serverPort2)); |
| */ |
| |
| vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", NUM_KEYS * 2)); |
| |
| /* verify */ |
| verifyRegionSize(vm5, NUM_KEYS * 2); |
| verifyRegionSize(vm2, NUM_KEYS * 2); |
| verifyRegionSize(vm4, NUM_KEYS * 2); |
| verifyRegionSize(vm3, NUM_KEYS * 2); |
| |
| disconnectAllFromDS(); |
| } |
| |
| public static void createClientWithLocator(int port0, String host, String regionName, |
| String clientId, boolean isDurable) { |
| WANTestBase test = new WANTestBase(); |
| Properties props = test.getDistributedSystemProperties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| if (isDurable) { |
| props.setProperty(DURABLE_CLIENT_ID, clientId); |
| props.setProperty(DURABLE_CLIENT_TIMEOUT, "" + 200); |
| } |
| |
| InternalDistributedSystem ds = test.getSystem(props); |
| cache = (InternalCache) CacheFactory.create(ds); |
| |
| assertNotNull(cache); |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| Pool p; |
| try { |
| p = PoolManager.createFactory().addLocator(host, port0).setPingInterval(250) |
| .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setSocketBufferSize(1000) |
| .setMinConnections(6).setMaxConnections(10).setRetryAttempts(3).create(regionName); |
| } finally { |
| CacheServerTestUtil.enableShufflingOfEndpoints(); |
| } |
| |
| RegionFactory factory = cache.createRegionFactory(RegionShortcut.LOCAL) |
| .setScope(Scope.DISTRIBUTED_NO_ACK) |
| .setPoolName(p.getName()); |
| region = factory.create(regionName); |
| region.registerInterest("ALL_KEYS"); |
| assertNotNull(region); |
| if (isDurable) { |
| cache.readyForEvents(); |
| } |
| LogWriterUtils.getLogWriter() |
| .info("Distributed Region " + regionName + " created Successfully :" + region.toString() |
| + " in a " + (isDurable ? "durable" : "") + " client"); |
| } |
| } |