| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.util.Arrays; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.GemFireConfigException; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.client.NoAvailableServersException; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.Connection; |
| import org.apache.geode.cache.client.internal.Op; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.client.internal.QueueConnectionImpl; |
| import org.apache.geode.cache.client.internal.RegisterInterestTracker; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.AvailablePort; |
| import org.apache.geode.internal.OSProcess; |
| import org.apache.geode.internal.cache.CachePerfStats; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.ha.ThreadIdentifier; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.dunit.DistributedTestUtils; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.dunit.internal.DUnitLauncher; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| import org.apache.geode.test.version.VersionManager; |
| |
| /** |
| * Tests client server corner cases between Region and Pool |
| */ |
| @Category({ClientServerTest.class}) |
| public class ClientServerMiscDUnitTestBase extends JUnit4CacheTestCase { |
| |
| protected static PoolImpl pool = null; |
| |
| protected static Connection conn = null; |
| |
| static Cache static_cache; |
| |
| static int PORT1; |
| |
| private static final String k1 = "k1"; |
| |
| private static final String k2 = "k2"; |
| |
| static final String server_k1 = "server-k1"; |
| |
| static final String server_k2 = "server-k2"; |
| |
| static final String REGION_NAME1 = "ClientServerMiscDUnitTest_region1"; |
| |
| static final String REGION_NAME2 = "ClientServerMiscDUnitTest_region2"; |
| |
| private static final String PR_REGION_NAME = "ClientServerMiscDUnitTest_PRregion"; |
| |
| private static Host host; |
| |
| protected static VM server1; |
| |
| protected static VM server2; |
| |
| private static RegionAttributes attrs; |
| |
| // variables for concurrent map API test |
| Properties props = new Properties(); |
| private final int putRange_1Start = 1; |
| private final int putRange_1End = 5; |
| private final int putRange_2Start = 6; |
| private final int putRange_2End = 10; |
| |
| |
| String testVersion; // version for client caches for backward-compatibility |
| // testing |
| |
| public ClientServerMiscDUnitTestBase() { |
| testVersion = VersionManager.CURRENT_VERSION; |
| } |
| |
| @Override |
| public final void postSetUp() { |
| host = Host.getHost(0); |
| server1 = host.getVM(VersionManager.CURRENT_VERSION, 2); |
| server2 = host.getVM(VersionManager.CURRENT_VERSION, 3); |
| } |
| |
| int initServerCache(boolean notifyBySub) { |
| return initServerCache(notifyBySub, false); |
| } |
| |
| int initServerCache2() { |
| return initServerCache2(false); |
| } |
| |
| private int initServerCache(boolean notifyBySub, boolean isHA) { |
| return initServerCache(notifyBySub, server1, isHA); |
| } |
| |
| private int initServerCache2(boolean isHA) { |
| return initServerCache(true, server2, isHA); |
| } |
| |
| int initServerCache(boolean notifyBySub, VM vm, boolean isHA) { |
| return vm.invoke(() -> createServerCache(notifyBySub, getMaxThreads(), isHA)); |
| } |
| |
| @Test |
| public void testConcurrentOperationsWithDRandPR() { |
| int port1 = initServerCache(true); // vm0 |
| int port2 = initServerCache2(); // vm1 |
| String serverName = NetworkUtils.getServerHostName(); |
| host.getVM(testVersion, 0).invoke(() -> createClientCacheV(serverName, port1)); |
| host.getVM(testVersion, 1).invoke(() -> createClientCacheV(serverName, port2)); |
| LogService.getLogger() |
| .info("Testing concurrent map operations from a client with a distributed region"); |
| concurrentMapTest(host.getVM(testVersion, 0), "/" + REGION_NAME1); |
| // TODO add verification in vm1 |
| LogService.getLogger() |
| .info("Testing concurrent map operations from a client with a partitioned region"); |
| concurrentMapTest(host.getVM(testVersion, 0), "/" + PR_REGION_NAME); |
| // TODO add verification in vm1 |
| } |
| |
| /** |
| * When a client's subscription thread connects to a server it should receive the server's |
| * pingInterval setting. This is used by the client to set a read-timeout in order to avoid |
| * hanging should the server's machine crash. |
| */ |
| @Test |
| public void testClientReceivesPingIntervalSetting() { |
| VM clientVM = Host.getHost(0).getVM(testVersion, 0); |
| |
| final int port = initServerCache(true); |
| final String host = NetworkUtils.getServerHostName(); |
| |
| clientVM.invoke("create client cache and verify", () -> { |
| createClientCacheAndVerifyPingIntervalIsSet(host, port); |
| }); |
| } |
| |
| void createClientCacheAndVerifyPingIntervalIsSet(String host, int port) throws Exception { |
| PoolImpl pool = null; |
| try { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| |
| createCache(props); |
| |
| pool = (PoolImpl) PoolManager.createFactory().addServer(host, port) |
| .setSubscriptionEnabled(true).setReadTimeout(1000) |
| .setSocketBufferSize(32768).setMinConnections(1).setSubscriptionRedundancy(-1) |
| .setPingInterval(2000).create("test pool"); |
| |
| Region<Object, Object> region = cache.createRegionFactory(RegionShortcut.LOCAL) |
| .setPoolName("test pool").create(REGION_NAME1); |
| region.registerInterest(".*"); |
| |
| /** get the subscription connection and verify that it has the correct timeout setting */ |
| QueueConnectionImpl primaryConnection = (QueueConnectionImpl) pool.getPrimaryConnection(); |
| int pingInterval = ((CacheClientUpdater) primaryConnection.getUpdater()) |
| .getServerQueueStatus().getPingInterval(); |
| assertNotEquals(0, pingInterval); |
| assertEquals(CacheClientNotifier.getClientPingInterval(), pingInterval); |
| } finally { |
| cache.close(); |
| } |
| |
| } |
| |
| @Test |
| public void testConcurrentOperationsWithDRandPRandEmptyClient() { |
| int port1 = initServerCache(true); // vm0 |
| int port2 = initServerCache2(); // vm1 |
| String serverName = NetworkUtils.getServerHostName(); |
| host.getVM(testVersion, 0).invoke(() -> createEmptyClientCache(serverName, port1)); |
| host.getVM(testVersion, 1).invoke(() -> createClientCacheV(serverName, port2)); |
| LogService.getLogger() |
| .info("Testing concurrent map operations from a client with a distributed region"); |
| concurrentMapTest(host.getVM(testVersion, 0), "/" + REGION_NAME1); |
| // TODO add verification in vm1 |
| LogService.getLogger() |
| .info("Testing concurrent map operations from a client with a partitioned region"); |
| concurrentMapTest(host.getVM(testVersion, 0), "/" + PR_REGION_NAME); |
| // TODO add verification in vm1 |
| } |
| |
| /** |
| * Do putIfAbsent(), replace(Object, Object), replace(Object, Object, Object), remove(Object, |
| * Object) operations |
| */ |
| private void concurrentMapTest(final VM clientVM, final String rName) { |
| |
| // String exceptionStr = ""; |
| clientVM.invoke(new CacheSerializableRunnable("doConcurrentMapOperations") { |
| @Override |
| public void run2() throws CacheException { |
| Cache cache = getCache(); |
| final Region pr = cache.getRegion(rName); |
| assertNotNull(rName + " not created", pr); |
| boolean isEmpty = pr.getAttributes().getDataPolicy() == DataPolicy.EMPTY; |
| |
| // test successful putIfAbsent |
| for (int i = putRange_1Start; i <= putRange_1End; i++) { |
| Object putResult = pr.putIfAbsent(Integer.toString(i), Integer.toString(i)); |
| assertNull("Expected null, but got " + putResult + " for key " + i, putResult); |
| } |
| int size; |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test unsuccessful putIfAbsent |
| for (int i = putRange_1Start; i <= putRange_1End; i++) { |
| Object putResult = pr.putIfAbsent(Integer.toString(i), Integer.toString(i + 1)); |
| assertEquals("for i=" + i, Integer.toString(i), putResult); |
| assertEquals("for i=" + i, Integer.toString(i), pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test successful replace(key, oldValue, newValue) |
| for (int i = putRange_1Start; i <= putRange_1End; i++) { |
| boolean replaceSucceeded = |
| pr.replace(Integer.toString(i), Integer.toString(i), "replaced" + i); |
| assertTrue("for i=" + i, replaceSucceeded); |
| assertEquals("for i=" + i, "replaced" + i, pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test unsuccessful replace(key, oldValue, newValue) |
| for (int i = putRange_1Start; i <= putRange_2End; i++) { |
| boolean replaceSucceeded = pr.replace(Integer.toString(i), Integer.toString(i), // wrong |
| // expected |
| // old |
| // value |
| "not" + i); |
| assertFalse("for i=" + i, replaceSucceeded); |
| assertEquals("for i=" + i, i <= putRange_1End ? "replaced" + i : null, |
| pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test successful replace(key, value) |
| for (int i = putRange_1Start; i <= putRange_1End; i++) { |
| Object replaceResult = pr.replace(Integer.toString(i), "twice replaced" + i); |
| assertEquals("for i=" + i, "replaced" + i, replaceResult); |
| assertEquals("for i=" + i, "twice replaced" + i, pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test unsuccessful replace(key, value) |
| for (int i = putRange_2Start; i <= putRange_2End; i++) { |
| Object replaceResult = pr.replace(Integer.toString(i), "thrice replaced" + i); |
| assertNull("for i=" + i, replaceResult); |
| assertNull("for i=" + i, pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test unsuccessful remove(key, value) |
| for (int i = putRange_1Start; i <= putRange_2End; i++) { |
| boolean removeResult = pr.remove(Integer.toString(i), Integer.toString(-i)); |
| assertFalse("for i=" + i, removeResult); |
| assertEquals("for i=" + i, i <= putRange_1End ? "twice replaced" + i : null, |
| pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", putRange_1End, size); |
| assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| // test successful remove(key, value) |
| for (int i = putRange_1Start; i <= putRange_1End; i++) { |
| boolean removeResult = pr.remove(Integer.toString(i), "twice replaced" + i); |
| assertTrue("for i=" + i, removeResult); |
| assertNull("for i=" + i, pr.get(Integer.toString(i))); |
| } |
| if (!isEmpty) { |
| size = pr.size(); |
| assertEquals("Size doesn't return expected value", 0, size); |
| pr.localClear(); |
| assertTrue("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty()); |
| } |
| |
| if (!isEmpty) { |
| // bug #42169 - entry not updated on server when locally destroyed on client |
| String key42169 = "key42169"; |
| pr.put(key42169, "initialValue42169"); |
| pr.localDestroy(key42169); |
| boolean success = pr.replace(key42169, "initialValue42169", "newValue42169"); |
| assertTrue("expected replace to succeed", success); |
| pr.destroy(key42169); |
| pr.put(key42169, "secondRound"); |
| pr.localDestroy(key42169); |
| Object result = pr.putIfAbsent(key42169, null); |
| assertEquals("expected putIfAbsent to fail", result, "secondRound"); |
| pr.destroy(key42169); |
| } |
| |
| if (isEmpty) { |
| String key41265 = "key41265"; |
| boolean success = pr.remove(key41265, null); |
| assertFalse("expected remove to fail because key does not exist", success); |
| } |
| |
| // test null values |
| |
| // putIfAbsent with null value creates invalid entry |
| Object oldValue = pr.putIfAbsent("keyForNull", null); |
| assertNull(oldValue); |
| if (!isEmpty) { |
| assertTrue(pr.containsKey("keyForNull")); |
| assertTrue(!pr.containsValueForKey("keyForNull")); |
| } |
| |
| // replace allows null value for oldValue, meaning invalidated entry |
| assertTrue(pr.replace("keyForNull", null, "no longer invalid")); |
| |
| // replace does not allow null value for new value |
| assertThatThrownBy(() -> pr.replace("keyForNull", "no longer invalid", null)) |
| .isExactlyInstanceOf(NullPointerException.class); |
| |
| // other variant of replace does not allow null value for new value |
| assertThatThrownBy(() -> pr.replace("keyForNull", null)) |
| .isExactlyInstanceOf(NullPointerException.class); |
| |
| // replace with null oldvalue matches invalidated entry |
| pr.putIfAbsent("otherKeyForNull", null); |
| CachePerfStats stats = ((GemFireCacheImpl) pr.getCache()).getCachePerfStats(); |
| |
| Number puts = getNumPuts(stats); |
| boolean success = pr.replace("otherKeyForNull", null, "no longer invalid"); |
| assertTrue(success); |
| |
| Number newputs = getNumPuts(stats); |
| assertEquals("stats not updated properly or replace malfunctioned", newputs.longValue(), |
| puts.longValue() + 1); |
| |
| } |
| |
| public Number getNumPuts(CachePerfStats stats) { |
| Method getPutsMethod = null; |
| try { |
| getPutsMethod = stats.getClass().getMethod("getPuts"); |
| } catch (NoSuchMethodException e) { |
| fail(e.getMessage()); |
| } |
| |
| Number puts = null; |
| try { |
| puts = (Number) getPutsMethod.invoke(stats); |
| } catch (IllegalAccessException e) { |
| fail(e.getMessage()); |
| } catch (InvocationTargetException e) { |
| fail(e.getMessage()); |
| } |
| |
| return puts; |
| } |
| }); |
| } |
| |
| /** |
| * Test two regions: notify by subscription is true. For region1 the interest list is empty , for |
| * region 2 the intetest list is all keys. If an update/create is made on region1 , the client |
| * should not receive any. If the create/update is on region2 , the client should receive the |
| * update. |
| */ |
| @Test |
| public void testForTwoRegionHavingDifferentInterestList() { |
| // start server first |
| PORT1 = initServerCache(true); |
| int serverPort = PORT1; |
| VM client1 = Host.getHost(0).getVM(testVersion, 1); |
| String hostname = NetworkUtils.getServerHostName(); |
| client1.invoke("create client1 cache", () -> { |
| createClientCache(hostname, serverPort); |
| populateCache(); |
| registerInterest(); |
| }); |
| |
| server1.invoke("putting entries in server1", () -> put()); |
| |
| client1.invoke(() -> verifyUpdates()); |
| } |
| |
| /** |
| * Test two regions: notify by subscription is true. Both the regions have registered interest in |
| * all the keys. Now close region1 on the client. The region1 should get removed from the interest |
| * list on CCP at server. Any update on region1 on server should not get pushed to the client. |
| * Ensure that the message related is not added to the client's queue at all ( which is diferent |
| * from not receiving a callbak on the client). If an update on region2 is made on the server , |
| * then client should receive the calback |
| */ |
| @Test |
| public void testForTwoRegionHavingALLKEYSInterest() throws Exception { |
| // start server first |
| PORT1 = initServerCache(true); |
| createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| populateCache(); |
| registerInterestInBothTheRegions(); |
| closeRegion1(); |
| Wait.pause(6000); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyInterestListOnServer()); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.put()); |
| // pause(5000); |
| verifyUpdatesOnRegion2(); |
| } |
| |
| /** |
| * Test two regions: notify by subscription is true. Both the regions have registered interest in |
| * all the keys. Close both the regions. When the last region is closed , it should close the |
| * ConnectionProxy on the client , close all the server connection threads on the server & remove |
| * the CacheClientProxy from the CacheClient notifier |
| */ |
| @Test |
| public void testRegionClose() throws Exception { |
| // start server first |
| PORT1 = initServerCache(true); |
| pool = (PoolImpl) createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| populateCache(); |
| registerInterestInBothTheRegions(); |
| closeBothRegions(); |
| |
| assertFalse(pool.isDestroyed()); |
| pool.destroy(); |
| assertTrue(pool.isDestroyed()); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyNoCacheClientProxyOnServer()); |
| |
| } |
| |
| /** |
| * Test two regions: notify by subscription is true. Both the regions have registered interest in |
| * all the keys. Destroy region1 on the client. It should reach the server , kill the region on |
| * the server , propagate it to the interested clients , but it should keep CacheClient Proxy |
| * alive. Destroy Region2 . It should reach server , close conenction proxy , destroy the region2 |
| * on the server , remove the cache client proxy from the cache client notifier & propagate it to |
| * the clients. Then create third region and verify that no CacheClientProxy is created on server |
| */ |
| @Test |
| public void testCCPDestroyOnLastDestroyRegion() throws Exception { |
| PORT1 = initServerCache(true); |
| PoolImpl pool = |
| (PoolImpl) createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| destroyRegion1(); |
| // pause(5000); |
| server1.invoke( |
| () -> ClientServerMiscDUnitTestBase.verifyCacheClientProxyOnServer(REGION_NAME1)); |
| Connection conn = pool.acquireConnection(); |
| assertNotNull(conn); |
| assertEquals(1, pool.getConnectedServerCount()); |
| assertFalse(pool.isDestroyed()); |
| destroyRegion2(); |
| assertFalse(pool.isDestroyed()); |
| destroyPRRegion(); |
| assertFalse(pool.isDestroyed()); |
| pool.destroy(); |
| assertTrue(pool.isDestroyed()); |
| // pause(5000); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyNoCacheClientProxyOnServer()); |
| |
| assertThatThrownBy(() -> getCache().createRegion(REGION_NAME2, attrs)) |
| .isExactlyInstanceOf(IllegalStateException.class); |
| } |
| |
| /** |
| * Test two regions:If notify by subscription is false , both the regions should receive |
| * invalidates for the updates on server in their respective regions |
| * |
| */ |
| @Test |
| public void testInvalidatesPropagateOnTwoRegions() throws Exception { |
| // start server first |
| PORT1 = initServerCache(false); |
| createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| registerInterestForInvalidatesInBothTheRegions(); |
| populateCache(); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.put()); |
| // pause(5000); |
| verifyInvalidatesOnBothRegions(); |
| |
| } |
| |
| /** |
| * Test for bug 43407, where LRU in the client caused an entry to be evicted with DESTROY(), then |
| * the client invalidated the entry and did a get(). After the get() the entry was not seen to be |
| * in the client's cache. This turned out to be expected behavior, but we now have this test to |
| * guarantee that the product behaves as expected. |
| */ |
| @Test |
| public void testGetInClientCreatesEntry() throws Exception { |
| // start server first |
| PORT1 = initServerCache(false); |
| createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| registerInterestForInvalidatesInBothTheRegions(); |
| Region region = static_cache.getRegion(REGION_NAME1); |
| populateCache(); |
| region.put("invalidationKey", "invalidationValue"); |
| |
| region.localDestroy("invalidationKey"); |
| assertThat(region.containsKey("invalidationKey")).isFalse(); |
| |
| region.invalidate("invalidationKey"); |
| assertThat(region.containsKey("invalidationKey")).isTrue(); |
| |
| Object value = region.get("invalidationKey"); |
| assertThat(value).isNull(); |
| assertThat(region.containsKeyOnServer("invalidationKey")).isTrue(); |
| } |
| |
| /** |
| * GEODE-478 - large payloads are rejected by client->server |
| */ |
| @Test |
| public void testLargeMessageIsRejected() throws Exception { |
| PORT1 = initServerCache(false); |
| createClientCache(NetworkUtils.getServerHostName(), PORT1); |
| Region region = static_cache.getRegion(REGION_NAME1); |
| Op operation = new Op() { |
| @Override |
| public Object attempt(Connection cnx) throws Exception { |
| throw new MessageTooLargeException("message is too big"); |
| } |
| }; |
| try { |
| ((LocalRegion) region).getServerProxy().getPool().execute(operation); |
| } catch (GemFireIOException e) { |
| assertTrue(e.getCause() instanceof MessageTooLargeException); |
| return; |
| } |
| fail("expected an exception to be thrown"); |
| } |
| |
| /** |
| * Create cache, create pool, notify-by-subscription=false, create a region and on client and on |
| * server. Do not attach pool to region , populate some entries on region both on client and |
| * server. Update the entries on server the client. The client should not have entry invalidate. |
| */ |
| @Test |
| public void testInvalidatesPropagateOnRegionHavingNoPool() { |
| // start server first |
| PORT1 = initServerCache(false); |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new ClientServerMiscDUnitTestBase().createCache(props); |
| String host = NetworkUtils.getServerHostName(); |
| PoolImpl p = |
| (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true) |
| .setReadTimeout(1000).setSocketBufferSize(32768) |
| .setMinConnections(3).setSubscriptionRedundancy(-1).setPingInterval(2000) |
| // .setRetryAttempts(5) |
| // .setRetryInterval(2000) |
| .create("testInvalidatesPropagateOnRegionHavingNoPool"); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| // factory.setPoolName(p.getName()); |
| |
| attrs = factory.create(); |
| final Region region1 = getCache().createRegion(REGION_NAME1, attrs); |
| final Region region2 = getCache().createRegion(REGION_NAME2, attrs); |
| assertNotNull(region1); |
| assertNotNull(region2); |
| pool = p; |
| conn = pool.acquireConnection(); |
| assertNotNull(conn); |
| |
| populateCache(); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.put()); |
| |
| await().until(() -> { |
| Object val = region1.getEntry(k1).getValue(); |
| return k1.equals(val); |
| }); |
| |
| await().until(() -> { |
| Object val = region1.getEntry(k2).getValue(); |
| return k2.equals(val); |
| }); |
| |
| await().until(() -> { |
| Object val = region2.getEntry(k1).getValue(); |
| return k1.equals(val); |
| }); |
| |
| await().until(() -> { |
| Object val = region2.getEntry(k2).getValue(); |
| return k2.equals(val); |
| }); |
| |
| // assertIndexDetailsEquals(region2.getEntry(k2).getValue(), k2); |
| } |
| |
| /** |
| * Create proxy before cache creation, create cache, create two regions, attach same bridge writer |
| * to both of the regions Region interests AL_KEYS on both the regions, |
| * notify-by-subscription=true . The CCP should have both the regions in interest list. |
| * |
| */ |
| |
| @Test |
| public void testProxyCreationBeforeCacheCreation() { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| PORT1 = initServerCache(true); |
| String host = NetworkUtils.getServerHostName(); |
| Pool p = PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true) |
| .setSubscriptionRedundancy(-1) |
| // .setRetryAttempts(5) |
| .create("testProxyCreationBeforeCacheCreationPool"); |
| |
| Cache cache = getCache(); |
| assertNotNull(cache); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setPoolName(p.getName()); |
| |
| RegionAttributes myAttrs = factory.create(); |
| Region region1 = cache.createRegion(REGION_NAME1, myAttrs); |
| Region region2 = cache.createRegion(REGION_NAME2, myAttrs); |
| assertNotNull(region1); |
| assertNotNull(region2); |
| // region1.registerInterest(CacheClientProxy.ALL_KEYS); |
| region2.registerInterest("ALL_KEYS"); |
| Wait.pause(6000); |
| server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyInterestListOnServer()); |
| |
| } |
| |
| /** |
| * |
| * bug 35380: Cycling a DistributedSystem with an initialized pool causes interest registration |
| * NPE |
| * |
| * Test Scenario: |
| * |
| * Create a DistributedSystem (DS1). Create a pool, initialize (creates a proxy with DS1 memberid) |
| * Disconnect DS1. Create a DistributedSystem (DS2). Create a Region with pool, it attempts to |
| * register interest using DS2 memberid, gets NPE. |
| * |
| */ |
| @Test |
| public void testSystemCanBeCycledWithAnInitializedPool() { |
| // work around GEODE-477 |
| IgnoredException.addIgnoredException("Connection reset"); |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| |
| PORT1 = initServerCache(true); |
| String host = NetworkUtils.getServerHostName(); |
| Pool p = PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true) |
| .setSubscriptionRedundancy(-1) |
| // .setRetryAttempts(5) |
| .create("testBug35380Pool"); |
| |
| Cache cache = getCache(); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setPoolName(p.getName()); |
| |
| RegionAttributes myAttrs = factory.create(); |
| Region region1 = cache.createRegion(REGION_NAME1, myAttrs); |
| Region region2 = cache.createRegion(REGION_NAME2, myAttrs); |
| assertNotNull(region1); |
| assertNotNull(region2); |
| |
| region2.registerInterest("ALL_KEYS"); |
| |
| ds.disconnect(); |
| Properties prop = new Properties(); |
| prop.setProperty(MCAST_PORT, "0"); |
| prop.setProperty(LOCATORS, ""); |
| ds = getSystem(prop); |
| |
| final Cache cacheForLamda = getCache(); |
| assertNotNull(cacheForLamda); |
| |
| AttributesFactory factory1 = new AttributesFactory(); |
| factory1.setScope(Scope.DISTRIBUTED_ACK); |
| // reuse writer from prev DS |
| factory1.setPoolName(p.getName()); |
| |
| final RegionAttributes attrs1 = factory1.create(); |
| assertThatThrownBy(() -> cacheForLamda.createRegion(REGION_NAME1, attrs1)) |
| .isInstanceOfAny(IllegalStateException.class, DistributedSystemDisconnectedException.class); |
| } |
| |
| @Test(expected = GemFireConfigException.class) |
| public void clientIsPreventedFromConnectingToLocatorAsServer() { |
| IgnoredException.addIgnoredException("Improperly configured client detected"); |
| ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); |
| clientCacheFactory.addPoolServer("localhost", DistributedTestUtils.getDUnitLocatorPort()); |
| clientCacheFactory.setPoolSubscriptionEnabled(true); |
| getClientCache(clientCacheFactory); |
| Region region = ((ClientCache) cache).createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(REGION_NAME1); |
| region.registerInterest(k1); |
| } |
| |
| |
| private void createCache(Properties props) { |
| createCacheV(props); |
| } |
| |
| private Cache createCacheV(Properties props) { |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| Cache cache = getCache(); |
| assertNotNull(cache); |
| return cache; |
| } |
| |
| public static void createClientCacheV(String h, int port) { |
| _createClientCache(h, false, -1, port); |
| } |
| |
| public static void createEmptyClientCache(String h, int... ports) { |
| _createClientCache(h, false, -1, ports); |
| } |
| |
| public static Pool createClientCache(String h, int... ports) { |
| return _createClientCache(h, false, -1, ports); |
| } |
| |
| public static Pool createClientCache(String h, int subscriptionAckInterval, boolean empty, |
| int... ports) { |
| return _createClientCache(h, empty, subscriptionAckInterval, ports); |
| } |
| |
| private static PoolFactory addServers(PoolFactory factory, String h, int... ports) { |
| for (int port : ports) { |
| factory.addServer(h, port); |
| } |
| return factory; |
| } |
| |
| public static Pool _createClientCache(String h, boolean empty, int subscriptionAckInterval, |
| int... ports) { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel); |
| Cache cache = new ClientServerMiscDUnitTestBase().createCacheV(props); |
| ClientServerMiscDUnitTestBase.static_cache = cache; |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", |
| "true"); |
| PoolImpl p; |
| try { |
| PoolFactory poolFactory = PoolManager.createFactory(); |
| addServers(poolFactory, h, ports).setSubscriptionEnabled(true) |
| .setReadTimeout(5000).setSocketBufferSize(32768).setMinConnections(3) |
| .setSubscriptionRedundancy(1).setPingInterval(2000); |
| // .setRetryAttempts(5) |
| // .setRetryInterval(2000) |
| if (subscriptionAckInterval > 0) { |
| poolFactory.setSubscriptionAckInterval(subscriptionAckInterval); |
| } |
| p = (PoolImpl) poolFactory.create("ClientServerMiscDUnitTestPool"); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| if (empty) { |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| } |
| factory.setPoolName(p.getName()); |
| |
| attrs = factory.create(); |
| } finally { |
| System.getProperties() |
| .remove(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints"); |
| } |
| |
| Region region1 = cache.createRegion(REGION_NAME1, attrs); |
| Region region2 = cache.createRegion(REGION_NAME2, attrs); |
| Region prRegion = cache.createRegion(PR_REGION_NAME, attrs); |
| assertNotNull(region1); |
| assertNotNull(region2); |
| assertNotNull(prRegion); |
| pool = p; |
| |
| await().until(() -> { |
| try { |
| conn = pool.acquireConnection(); |
| return conn != null; |
| } catch (NoAvailableServersException e) { |
| return false; |
| } |
| }); |
| |
| return p; |
| } |
| |
| static class MemberIDVerifier extends CacheListenerAdapter { |
| boolean memberIDNotReceived = true; |
| boolean eventReceived = false; |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| eventReceived(event); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| eventReceived(event); |
| } |
| |
| private void eventReceived(EntryEvent event) { |
| eventReceived = true; |
| DistributedMember memberID = event.getDistributedMember(); |
| memberIDNotReceived = (memberID == null); |
| // System.out.println("received event " + event); |
| } |
| |
| public void reset() { |
| memberIDNotReceived = true; |
| eventReceived = false; |
| } |
| } |
| |
| public static void dumpPoolIdentifiers() throws Exception { |
| // duplicate events were received, so let's look at the thread identifiers we have |
| PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); |
| |
| Map seqMap = pool.getThreadIdToSequenceIdMap(); |
| for (Object o : seqMap.keySet()) { |
| ThreadIdentifier tid = (ThreadIdentifier) o; |
| byte[] memberBytes = tid.getMembershipID(); |
| dumpMemberId(tid, memberBytes); |
| } |
| } |
| |
| public static void dumpMemberId(Object holder, byte[] memberBytes) throws Exception { |
| byte[] newBytes = new byte[memberBytes.length + 17]; |
| System.arraycopy(memberBytes, 0, newBytes, 0, memberBytes.length); |
| ByteArrayInputStream bais = new ByteArrayInputStream(newBytes); |
| DataInputStream dataIn = new DataInputStream(bais); |
| InternalDistributedMember memberId = InternalDistributedMember.readEssentialData(dataIn); |
| String sb = "<" + Thread.currentThread().getName() + "> " + holder |
| + " is " + memberId + " byte count = " + memberBytes.length |
| + " bytes = " + Arrays.toString(memberBytes); |
| System.out.println(sb); |
| } |
| |
| |
| public static Integer createServerCache(Boolean notifyBySubscription, Integer maxThreads, |
| boolean isHA) throws Exception { |
| Cache cache = new ClientServerMiscDUnitTestBase().createCacheV(new Properties()); |
| unsetSlowDispatcherFlag(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setEnableConflation(true); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(true); |
| RegionAttributes myAttrs = factory.create(); |
| Region r1 = cache.createRegion(REGION_NAME1, myAttrs); |
| Region r2 = cache.createRegion(REGION_NAME2, myAttrs); |
| factory = new AttributesFactory(); |
| factory.setDataPolicy(DataPolicy.PARTITION); |
| if (isHA) { |
| PartitionAttributesFactory paf = new PartitionAttributesFactory().setRedundantCopies(1); |
| factory.setPartitionAttributes(paf.create()); |
| } |
| RegionAttributes prAttrs = factory.create(); |
| |
| Region pr = cache.createRegion(PR_REGION_NAME, prAttrs); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| assertNotNull(pr); |
| |
| CacheServer server = cache.addCacheServer(); |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| r1.getCache().getDistributedSystem().getLogWriter().info("Starting server on port " + port); |
| server.setPort(port); |
| server.setMaxThreads(maxThreads); |
| server.setNotifyBySubscription(notifyBySubscription); |
| server.start(); |
| r1.getCache().getDistributedSystem().getLogWriter() |
| .info("Started server on port " + server.getPort()); |
| return server.getPort(); |
| |
| } |
| |
| protected int getMaxThreads() { |
| return 0; |
| } |
| |
| public static void registerInterest() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r); |
| r.registerInterest("ALL_KEYS"); |
| r.getAttributesMutator().addCacheListener(new MemberIDVerifier()); |
| } |
| |
| private static void registerInterestForInvalidatesInBothTheRegions() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| assertNotNull(r1); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r2); |
| r1.registerInterestForAllKeys(InterestResultPolicy.KEYS, false, false); |
| r2.registerInterestForAllKeys(InterestResultPolicy.KEYS, false, false); |
| } catch (CacheWriterException e) { |
| e.printStackTrace(); |
| fail("Test failed due to CacheWriterException during registerInterestnBothRegions" + e); |
| } |
| } |
| |
| private static void registerInterestInBothTheRegions() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| assertNotNull(r1); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r2); |
| r1.registerInterestForAllKeys(); |
| r2.registerInterestForAllKeys(); |
| } catch (CacheWriterException e) { |
| e.printStackTrace(); |
| fail("Test failed due to CacheWriterException during registerInterestnBothRegions" + e); |
| } |
| } |
| |
| private static void closeRegion1() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region<String, String> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| assertNotNull(r1); |
| r1.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to Exception during closeRegion1" + e); |
| } |
| } |
| |
| private static void closeBothRegions() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| Region pr = cache.getRegion(Region.SEPARATOR + PR_REGION_NAME); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| assertNotNull(pr); |
| r1.close(); |
| r2.close(); |
| pr.close(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to Exception during closeBothRegions" + e); |
| } |
| } |
| |
| private static void destroyRegion1() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| assertNotNull(r1); |
| r1.destroyRegion(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to Exception during closeBothRegions" + e); |
| } |
| } |
| |
| private static void destroyRegion2() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r2); |
| r2.destroyRegion(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail("Test failed due to Exception during closeBothRegions" + e); |
| } |
| } |
| |
| private static void destroyPRRegion() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r2 = cache.getRegion(Region.SEPARATOR + PR_REGION_NAME); |
| assertNotNull(r2); |
| r2.destroyRegion(); |
| } catch (Exception e) { |
| // e.printStackTrace(); |
| fail("Test failed due to Exception during closeBothRegions" + e); |
| } |
| } |
| |
| private static void verifyInterestListOnServer() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| assertNotNull(bs.getAcceptor().getCacheClientNotifier()); |
| for (CacheClientProxy ccp : bs.getAcceptor().getCacheClientNotifier().getClientProxies()) { |
| // CCP should not contain region1 |
| Set<String> akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions; |
| assertNotNull(akr); |
| assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1)); |
| // CCP should contain region2 |
| assertTrue(akr.contains(Region.SEPARATOR + REGION_NAME2)); |
| assertEquals(1, akr.size()); |
| } |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| fail("while setting verifyInterestListOnServer " + ex); |
| } |
| } |
| |
| private static void verifyNoCacheClientProxyOnServer() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); |
| CacheServerImpl cacheServer = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(cacheServer); |
| assertNotNull(cacheServer.getAcceptor()); |
| final CacheClientNotifier ccn = cacheServer.getAcceptor().getCacheClientNotifier(); |
| |
| assertNotNull(ccn); |
| await() |
| .until(() -> ccn.getClientProxies().size() == 0); |
| } catch (Exception ex) { |
| System.out.println("The size of the client proxies != 0"); |
| OSProcess.printStacks(0); |
| throw ex; |
| } |
| } |
| |
| private static void verifyCacheClientProxyOnServer(String regionName) { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| assertNull(cache.getRegion(Region.SEPARATOR + regionName)); |
| verifyCacheClientProxyOnServer(); |
| |
| // assertIndexDetailsEquals(1, |
| // bs.getAcceptor().getCacheClientNotifier().getClientProxies().size()); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| fail("while setting verifyNoCacheClientProxyOnServer " + ex); |
| } |
| } |
| |
| private static void verifyCacheClientProxyOnServer() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); |
| |
| assertNotNull(ccn); |
| |
| await() |
| .until(() -> ccn.getClientProxies().size() == 1); |
| } |
| |
| public static void populateCache() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| |
| if (!r1.containsKey(k1)) |
| r1.create(k1, k1); |
| if (!r1.containsKey(k2)) |
| r1.create(k2, k2); |
| if (!r2.containsKey(k1)) |
| r2.create(k1, k1); |
| if (!r2.containsKey(k2)) |
| r2.create(k2, k2); |
| |
| assertEquals(r1.getEntry(k1).getValue(), k1); |
| assertEquals(r1.getEntry(k2).getValue(), k2); |
| assertEquals(r2.getEntry(k1).getValue(), k1); |
| assertEquals(r2.getEntry(k2).getValue(), k2); |
| } |
| |
| public static void put() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| |
| r1.put(k1, server_k1); |
| r1.put(k2, server_k2); |
| |
| r2.put(k1, server_k1); |
| r2.put(k2, server_k2); |
| |
| assertEquals(r1.getEntry(k1).getValue(), server_k1); |
| assertEquals(r1.getEntry(k2).getValue(), server_k2); |
| assertEquals(r2.getEntry(k1).getValue(), server_k1); |
| assertEquals(r2.getEntry(k2).getValue(), server_k2); |
| } |
| |
| static void putForClient() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| if (r2 == null) { |
| r2 = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME2); |
| } |
| |
| r2.put(k1, "client2_k1"); |
| r2.put(k2, "client2_k2"); |
| } |
| |
| private static void verifyUpdates() { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| |
| // no interest registered in region1 - it should hold client values, which are |
| // the same as the keys |
| await().until(() -> { |
| Object val = r1.getEntry(k1).getValue(); |
| return k1.equals(val); |
| }); |
| |
| await().until(() -> { |
| Object val = r1.getEntry(k2).getValue(); |
| return k2.equals(val); |
| }); |
| |
| // interest was registered in region2 - it should contain server values |
| await().until(() -> { |
| Object val = r2.getEntry(k1).getValue(); |
| return server_k1.equals(val); |
| }); |
| |
| await().until(() -> { |
| Object val = r2.getEntry(k2).getValue(); |
| return server_k2.equals(val); |
| }); |
| |
| // events should have contained a memberID |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| assertTrue("client should have received a listener event", verifier.eventReceived); |
| assertFalse("client received an update but the event had no member id", |
| verifier.memberIDNotReceived); |
| verifier.reset(); |
| |
| } |
| |
| private static void verifyInvalidatesOnBothRegions() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1); |
| final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r1); |
| assertNotNull(r2); |
| |
| await() |
| .until(() -> r1.getEntry(k1).getValue() == null); |
| |
| await() |
| .until(() -> r1.getEntry(k2).getValue() == null); |
| |
| await() |
| .until(() -> r2.getEntry(k1).getValue() == null); |
| |
| await() |
| .until(() -> r2.getEntry(k2).getValue() == null); |
| |
| } catch (Exception ex) { |
| fail("failed while verifyInvalidatesOnBothRegions()" + ex); |
| } |
| } |
| |
| private static void verifyUpdatesOnRegion2() { |
| try { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r2); |
| |
| await() |
| .until(() -> server_k1.equals(r2.getEntry(k1).getValue())); |
| |
| await() |
| .until(() -> server_k2.equals(r2.getEntry(k2).getValue())); |
| |
| // assertIndexDetailsEquals(server_k2, r2.getEntry(k2).getValue()); |
| } catch (Exception ex) { |
| fail("failed while verifyUpdatesOnRegion2()" + ex); |
| } |
| } |
| |
| /** |
| * set the boolean for starting the dispatcher thread a bit later to FALSE. This is just a |
| * precaution in case any test set it to true and did not unset it on completion. |
| * |
| */ |
| private static void unsetSlowDispatcherFlag() { |
| CacheClientProxy.isSlowStartForTesting = false; |
| } |
| |
| @Test |
| public void testOnSeverMethodsWithProxyClient() throws Exception { |
| testOnServerMethods(false, false); |
| } |
| |
| @Test |
| public void testOnSeverMethodsWithCachingProxyClient() throws Exception { |
| testOnServerMethods(true, false); |
| } |
| |
| @Test |
| public void testOnSeverMethodsWithProxyClientHA() throws Exception { |
| testOnServerMethods(false, true); |
| } |
| |
| @Test |
| public void testOnSeverMethodsWithCachingProxyClientHA() throws Exception { |
| testOnServerMethods(true, true); |
| } |
| |
| private void testOnServerMethods(boolean isCachingProxy, boolean isHA) throws Exception { |
| int port1 = initServerCache(true, isHA); // vm0 |
| int port2 = initServerCache2(isHA); // vm1 |
| String serverName = NetworkUtils.getServerHostName(); |
| if (isCachingProxy) { |
| createClientCache(serverName, port1, port2); |
| } else { |
| createEmptyClientCache(serverName, port1, port2); |
| } |
| if (isHA) { |
| // add another server for HA scenario |
| initServerCache(true, host.getVM(VersionManager.CURRENT_VERSION, 1), true); |
| } |
| String rName = "/" + REGION_NAME1; |
| String prName = "/" + PR_REGION_NAME; |
| |
| verifyIsEmptyOnServer(rName, true); |
| verifyIsEmptyOnServer(prName, true); |
| |
| int size = 10; |
| putIntoRegion(rName, size, isCachingProxy); |
| verifySizeOnServer(rName, size); |
| |
| putIntoRegion(prName, size, isCachingProxy); |
| if (isHA) { |
| server1.invoke(() -> closeMyCache()); |
| } |
| verifySizeOnServer(prName, size); |
| |
| verifyIsEmptyOnServer(rName, false); |
| verifyIsEmptyOnServer(prName, false); |
| |
| destroyEntries(rName, size); |
| destroyEntries(prName, size); |
| |
| verifyIsEmptyOnServer(rName, true); |
| verifyIsEmptyOnServer(prName, true); |
| verifySizeOnServer(rName, 0); |
| verifySizeOnServer(prName, 0); |
| } |
| |
| private void putIntoRegion(String regionName, int size, boolean isCachingProxy) { |
| Cache cache = getCache(); |
| final Region region = cache.getRegion(regionName); |
| for (int i = 0; i < size; i++) { |
| region.put(i, i); |
| } |
| |
| if (isCachingProxy) { |
| for (int i = 0; i < size; i++) { |
| region.localDestroy(i, i); |
| } |
| } |
| } |
| |
| private void destroyEntries(String regionName, int size) { |
| Cache cache = getCache(); |
| final Region region = cache.getRegion(regionName); |
| |
| for (int i = 0; i < size; i++) { |
| region.destroy(i); |
| } |
| } |
| |
| private void verifySizeOnServer(String regionName, int expectedSize) { |
| Cache cache = getCache(); |
| final Region region = cache.getRegion(regionName); |
| int actualSize = region.sizeOnServer(); |
| assertEquals( |
| "sizeOnServer returns unexpected " + actualSize + " instead of expected " + expectedSize, |
| expectedSize, actualSize); |
| } |
| |
| private void verifyIsEmptyOnServer(String regionName, boolean expected) { |
| Cache cache = getCache(); |
| final Region region = cache.getRegion(regionName); |
| boolean isEmptyOnServer = region.isEmptyOnServer(); |
| assertEquals("isEmptyOnServer returns unexpected " + isEmptyOnServer + " instead of expected " |
| + expected, expected, isEmptyOnServer); |
| } |
| |
| private void closeMyCache() { |
| Cache cache = getCache(); |
| cache.close(); |
| } |
| |
| } |