| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static org.apache.geode.cache.InterestResultPolicy.KEYS; |
| import static org.apache.geode.cache.Region.Entry; |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.cache.client.internal.RegisterInterestTracker.interestListIndex; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.apache.geode.internal.cache.tier.InterestType.KEY; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.MirrorType; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.Connection; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.client.internal.ServerRegionProxy; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.internal.ServerLocation; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.ClientServerObserverAdapter; |
| import org.apache.geode.internal.cache.ClientServerObserverHolder; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.tier.InterestType; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * Tests Interest Registration Functionality |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| @SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"}) |
| public class HAInterestTestCase extends JUnit4DistributedTestCase { |
| |
| protected static final int TIMEOUT_MILLIS = 60 * 1000; |
| protected static final int INTERVAL_MILLIS = 10; |
| |
| protected static final String REGION_NAME = "HAInterestBaseTest_region"; |
| |
| protected static final String k1 = "k1"; |
| protected static final String k2 = "k2"; |
| protected static final String client_k1 = "client-k1"; |
| protected static final String client_k2 = "client-k2"; |
| protected static final String server_k1 = "server-k1"; |
| protected static final String server_k2 = "server-k2"; |
| protected static final String server_k1_updated = "server_k1_updated"; |
| |
| protected static Cache cache = null; |
| protected static PoolImpl pool = null; |
| protected static Connection conn = null; |
| |
| protected static int PORT1; |
| protected static int PORT2; |
| protected static int PORT3; |
| |
| protected static boolean isBeforeRegistrationCallbackCalled = false; |
| protected static boolean isBeforeInterestRecoveryCallbackCalled = false; |
| protected static boolean isAfterRegistrationCallbackCalled = false; |
| |
| protected static Host host = null; |
| protected static VM server1 = null; |
| protected static VM server2 = null; |
| protected static VM server3 = null; |
| |
| protected static volatile boolean exceptionOccurred = false; |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| host = Host.getHost(0); |
| server1 = host.getVM(0); |
| server2 = host.getVM(1); |
| server3 = host.getVM(2); |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| // start servers first |
| PORT1 = ((Integer) server1.invoke(() -> HAInterestTestCase.createServerCache())).intValue(); |
| PORT2 = ((Integer) server2.invoke(() -> HAInterestTestCase.createServerCache())).intValue(); |
| PORT3 = ((Integer) server3.invoke(() -> HAInterestTestCase.createServerCache())).intValue(); |
| exceptionOccurred = false; |
| IgnoredException.addIgnoredException("java.net.ConnectException: Connection refused: connect"); |
| } |
| |
| @Override |
| public final void preTearDown() throws Exception { |
| // close the clients first |
| closeCache(); |
| |
| // then close the servers |
| server1.invoke(() -> HAInterestTestCase.closeCache()); |
| server2.invoke(() -> HAInterestTestCase.closeCache()); |
| server3.invoke(() -> HAInterestTestCase.closeCache()); |
| CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag(); |
| } |
| |
| public static void closeCache() { |
| PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; |
| PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false; |
| PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; |
| PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; |
| HAInterestTestCase.isAfterRegistrationCallbackCalled = false; |
| HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = false; |
| HAInterestTestCase.isBeforeRegistrationCallbackCalled = false; |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| } |
| cache = null; |
| pool = null; |
| conn = null; |
| } |
| |
| /** |
| * Return the current primary waiting for a primary to exist. |
| * |
| * @since GemFire 5.7 |
| */ |
| public static VM getPrimaryVM() { |
| return getPrimaryVM(null); |
| } |
| |
| /** |
| * Return the current primary waiting for a primary to exist and for it not to be the oldPrimary |
| * (if oldPrimary is NOT null). |
| * |
| * @since GemFire 5.7 |
| */ |
| public static VM getPrimaryVM(final VM oldPrimary) { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| int primaryPort = pool.getPrimaryPort(); |
| if (primaryPort == -1) { |
| return false; |
| } |
| // we have a primary |
| VM currentPrimary = getServerVM(primaryPort); |
| if (currentPrimary != oldPrimary) { |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for primary"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| int primaryPort = pool.getPrimaryPort(); |
| assertTrue(primaryPort != -1); |
| VM currentPrimary = getServerVM(primaryPort); |
| assertTrue(currentPrimary != oldPrimary); |
| return currentPrimary; |
| } |
| |
| public static VM getBackupVM() { |
| return getBackupVM(null); |
| } |
| |
| public static VM getBackupVM(VM stoppedBackup) { |
| VM currentPrimary = getPrimaryVM(null); |
| if (currentPrimary != server2 && server2 != stoppedBackup) { |
| return server2; |
| } else if (currentPrimary != server3 && server3 != stoppedBackup) { |
| return server3; |
| } else if (currentPrimary != server1 && server1 != stoppedBackup) { |
| return server1; |
| } else { |
| fail("expected currentPrimary " + currentPrimary + " to be " + server1 + ", or " + server2 |
| + ", or " + server3); |
| return null; |
| } |
| } |
| |
| /** |
| * Given a server vm (server1, server2, or server3) return its port. |
| * |
| * @since GemFire 5.7 |
| */ |
| public static int getServerPort(VM vm) { |
| if (vm == server1) { |
| return PORT1; |
| } else if (vm == server2) { |
| return PORT2; |
| } else if (vm == server3) { |
| return PORT3; |
| } else { |
| fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", or " + server3); |
| return -1; |
| } |
| } |
| |
| /** |
| * Given a server port (PORT1, PORT2, or PORT3) return its vm. |
| * |
| * @since GemFire 5.7 |
| */ |
| public static VM getServerVM(int port) { |
| if (port == PORT1) { |
| return server1; |
| } else if (port == PORT2) { |
| return server2; |
| } else if (port == PORT3) { |
| return server3; |
| } else { |
| fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", or " + PORT3); |
| return null; |
| } |
| } |
| |
| public static void verifyRefreshedEntriesFromServer() { |
| final Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| Entry re = r1.getEntry(k1); |
| if (re == null) |
| return false; |
| Object val = re.getValue(); |
| return client_k1.equals(val); |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for client_k1 refresh from server"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| Entry re = r1.getEntry(k2); |
| if (re == null) |
| return false; |
| Object val = re.getValue(); |
| return client_k2.equals(val); |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for client_k2 refresh from server"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| public static void verifyDeadAndLiveServers(final int expectedDeadServers, |
| final int expectedLiveServers) { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == expectedLiveServers; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for pool.getConnectedServerCount() == expectedLiveServer"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| public static void putK1andK2() { |
| Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| r1.put(k1, server_k1); |
| r1.put(k2, server_k2); |
| } |
| |
| public static void setClientServerObserverForBeforeInterestRecoveryFailure() { |
| PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { |
| @Override |
| public void beforeInterestRecovery() { |
| synchronized (HAInterestTestCase.class) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| getBackupVM().invoke(() -> HAInterestTestCase.startServer()); |
| getPrimaryVM().invoke(() -> HAInterestTestCase.stopServer()); |
| } |
| }; |
| t.start(); |
| try { |
| ThreadUtils.join(t, 30 * 1000); |
| } catch (Exception ignore) { |
| exceptionOccurred = true; |
| } |
| HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true; |
| HAInterestTestCase.class.notify(); |
| PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; |
| } |
| } |
| }); |
| } |
| |
| public static void setClientServerObserverForBeforeInterestRecovery() { |
| PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { |
| @Override |
| public void beforeInterestRecovery() { |
| synchronized (HAInterestTestCase.class) { |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| r1.put(k1, server_k1_updated); |
| } |
| }; |
| t.start(); |
| |
| HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true; |
| HAInterestTestCase.class.notify(); |
| PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false; |
| } |
| } |
| }); |
| } |
| |
| public static void waitForBeforeInterestRecoveryCallBack() throws InterruptedException { |
| assertNotNull(cache); |
| synchronized (HAInterestTestCase.class) { |
| while (!isBeforeInterestRecoveryCallbackCalled) { |
| HAInterestTestCase.class.wait(); |
| } |
| } |
| } |
| |
| public static void setClientServerObserverForBeforeRegistration(final VM vm) { |
| PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { |
| @Override |
| public void beforeInterestRegistration() { |
| synchronized (HAInterestTestCase.class) { |
| vm.invoke(() -> HAInterestTestCase.startServer()); |
| HAInterestTestCase.isBeforeRegistrationCallbackCalled = true; |
| HAInterestTestCase.class.notify(); |
| PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; |
| } |
| } |
| }); |
| } |
| |
| public static void waitForBeforeRegistrationCallback() throws InterruptedException { |
| assertNotNull(cache); |
| synchronized (HAInterestTestCase.class) { |
| while (!isBeforeRegistrationCallbackCalled) { |
| HAInterestTestCase.class.wait(); |
| } |
| } |
| } |
| |
| public static void setClientServerObserverForAfterRegistration(final VM vm) { |
| PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() { |
| @Override |
| public void afterInterestRegistration() { |
| synchronized (HAInterestTestCase.class) { |
| vm.invoke(() -> HAInterestTestCase.startServer()); |
| HAInterestTestCase.isAfterRegistrationCallbackCalled = true; |
| HAInterestTestCase.class.notify(); |
| PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; |
| } |
| } |
| }); |
| } |
| |
| public static void waitForAfterRegistrationCallback() throws InterruptedException { |
| assertNotNull(cache); |
| if (!isAfterRegistrationCallbackCalled) { |
| synchronized (HAInterestTestCase.class) { |
| while (!isAfterRegistrationCallbackCalled) { |
| HAInterestTestCase.class.wait(); |
| } |
| } |
| } |
| } |
| |
| public static void unSetClientServerObserverForRegistrationCallback() { |
| synchronized (HAInterestTestCase.class) { |
| PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false; |
| PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false; |
| HAInterestTestCase.isBeforeRegistrationCallbackCalled = false; |
| HAInterestTestCase.isAfterRegistrationCallbackCalled = false; |
| } |
| } |
| |
| public static void verifyDispatcherIsAlive() { |
| assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return cache.getCacheServers().size() == 1; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for cache.getCacheServers().size() == 1"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| assertNotNull(bs.getAcceptor().getCacheClientNotifier()); |
| final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccn.getClientProxies().size() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for ccn.getClientProxies().size() > 0"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| wc = new WaitCriterion() { |
| Iterator iter_prox; |
| CacheClientProxy proxy; |
| |
| @Override |
| public boolean done() { |
| iter_prox = ccn.getClientProxies().iterator(); |
| if (iter_prox.hasNext()) { |
| proxy = (CacheClientProxy) iter_prox.next(); |
| return proxy._messageDispatcher.isAlive(); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for CacheClientProxy _messageDispatcher to be alive"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| public static void verifyDispatcherIsNotAlive() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return cache.getCacheServers().size() == 1; |
| } |
| |
| @Override |
| public String description() { |
| return "cache.getCacheServers().size() == 1"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| assertNotNull(bs.getAcceptor().getCacheClientNotifier()); |
| final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccn.getClientProxies().size() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for ccn.getClientProxies().size() > 0"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| Iterator iter_prox = ccn.getClientProxies().iterator(); |
| if (iter_prox.hasNext()) { |
| CacheClientProxy proxy = (CacheClientProxy) iter_prox.next(); |
| assertFalse("Dispatcher on secondary should not be alive", |
| proxy._messageDispatcher.isAlive()); |
| } |
| } |
| |
| public static void createEntriesK1andK2OnServer() { |
| Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| if (!r1.containsKey(k1)) { |
| r1.create(k1, server_k1); |
| } |
| if (!r1.containsKey(k2)) { |
| r1.create(k2, server_k2); |
| } |
| assertEquals(r1.getEntry(k1).getValue(), server_k1); |
| assertEquals(r1.getEntry(k2).getValue(), server_k2); |
| } |
| |
| public static void createEntriesK1andK2() { |
| Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| if (!r1.containsKey(k1)) { |
| r1.create(k1, client_k1); |
| } |
| if (!r1.containsKey(k2)) { |
| r1.create(k2, client_k2); |
| } |
| assertEquals(r1.getEntry(k1).getValue(), client_k1); |
| assertEquals(r1.getEntry(k2).getValue(), client_k2); |
| } |
| |
| public static void createServerEntriesK1andK2() { |
| Region r1 = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r1); |
| if (!r1.containsKey(k1)) { |
| r1.create(k1, server_k1); |
| } |
| if (!r1.containsKey(k2)) { |
| r1.create(k2, server_k2); |
| } |
| assertEquals(r1.getEntry(k1).getValue(), server_k1); |
| assertEquals(r1.getEntry(k2).getValue(), server_k2); |
| } |
| |
| public static void registerK1AndK2() { |
| Region r = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| r.registerInterest(list, InterestResultPolicy.KEYS_VALUES); |
| } |
| |
| public static void reRegisterK1AndK2() { |
| Region r = cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| r.registerInterest(list); |
| } |
| |
| public static void startServer() throws IOException { |
| Cache c = CacheFactory.getAnyInstance(); |
| assertEquals("More than one BridgeServer", 1, c.getCacheServers().size()); |
| CacheServerImpl bs = (CacheServerImpl) c.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| bs.start(); |
| } |
| |
| public static void stopServer() { |
| assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size()); |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| bs.stop(); |
| } |
| |
| public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() { |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "connected server count never became 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| // close primaryEP |
| getPrimaryVM().invoke(() -> stopServer()); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| List serverKeys = srp.registerInterest(list, KEY, KEYS, false, |
| r.getAttributes().getDataPolicy().ordinal); |
| assertNotNull(serverKeys); |
| List resultKeys = (List) serverKeys.get(0); |
| assertEquals(2, resultKeys.size()); |
| assertTrue(resultKeys.contains(k1)); |
| assertTrue(resultKeys.contains(k2)); |
| } |
| |
| public static void stopPrimaryAndUnregisterRegisterK1() { |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "connected server count never became 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| // close primaryEP |
| getPrimaryVM().invoke(() -> stopServer()); |
| List list = new ArrayList(); |
| list.add(k1); |
| srp.unregisterInterest(list, KEY, false, false); |
| } |
| |
| public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() { |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "connected server count never became 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| // close primaryEP |
| VM backup = getBackupVM(); |
| getPrimaryVM().invoke(() -> stopServer()); |
| // close secondary |
| backup.invoke(() -> stopServer()); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| List serverKeys = srp.registerInterest(list, KEY, KEYS, false, |
| r.getAttributes().getDataPolicy().ordinal); |
| |
| assertNotNull(serverKeys); |
| List resultKeys = (List) serverKeys.get(0); |
| assertEquals(2, resultKeys.size()); |
| assertTrue(resultKeys.contains(k1)); |
| assertTrue(resultKeys.contains(k2)); |
| } |
| |
| /** |
| * returns the secondary that was stopped |
| */ |
| public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() { |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "Never got three connected servers"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| // close secondary EP |
| VM result = getBackupVM(); |
| result.invoke(() -> stopServer()); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| List serverKeys = srp.registerInterest(list, KEY, KEYS, false, |
| r.getAttributes().getDataPolicy().ordinal); |
| |
| assertNotNull(serverKeys); |
| List resultKeys = (List) serverKeys.get(0); |
| assertEquals(2, resultKeys.size()); |
| assertTrue(resultKeys.contains(k1)); |
| assertTrue(resultKeys.contains(k2)); |
| return result; |
| } |
| |
| /** |
| * returns the secondary that was stopped |
| */ |
| public static VM stopSecondaryAndUNregisterK1() { |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getConnectedServerCount() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "connected server count never became 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| // close secondary EP |
| VM result = getBackupVM(); |
| result.invoke(() -> stopServer()); |
| List list = new ArrayList(); |
| list.add(k1); |
| srp.unregisterInterest(list, KEY, false, false); |
| return result; |
| } |
| |
| public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() { |
| ServerLocation primary = pool.getPrimary(); |
| ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0); |
| LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME); |
| assertNotNull(r); |
| ServerRegionProxy srp = new ServerRegionProxy(r); |
| List list = new ArrayList(); |
| list.add(k1); |
| list.add(k2); |
| |
| // Primary server |
| List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY, |
| InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); |
| assertNotNull(serverKeys1); |
| // expect serverKeys in response from primary |
| List resultKeys = (List) serverKeys1.get(0); |
| assertEquals(2, resultKeys.size()); |
| assertTrue(resultKeys.contains(k1)); |
| assertTrue(resultKeys.contains(k2)); |
| |
| // Secondary server |
| List serverKeys2 = srp.registerInterestOn(secondary, list, InterestType.KEY, |
| InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal); |
| // if the list is null then it is empty |
| if (serverKeys2 != null) { |
| // no serverKeys in response from secondary |
| assertTrue(serverKeys2.isEmpty()); |
| } |
| } |
| |
| public static void verifyInterestRegistration() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return cache.getCacheServers().size() == 1; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for cache.getCacheServers().size() == 1"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| assertNotNull(bs.getAcceptor().getCacheClientNotifier()); |
| final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccn.getClientProxies().size() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for ccn.getClientProxies().size() > 0"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| Iterator iter_prox = ccn.getClientProxies().iterator(); |
| |
| if (iter_prox.hasNext()) { |
| final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| Set keysMap = (Set) ccp.cils[interestListIndex] |
| .getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); |
| return keysMap != null && keysMap.size() == 2; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for keys of interest to include 2 keys"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| Set keysMap = (Set) ccp.cils[interestListIndex] |
| .getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); |
| assertNotNull(keysMap); |
| assertEquals(2, keysMap.size()); |
| assertTrue(keysMap.contains(k1)); |
| assertTrue(keysMap.contains(k2)); |
| } |
| } |
| |
| public static void verifyInterestUNRegistration() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return cache.getCacheServers().size() == 1; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for cache.getCacheServers().size() == 1"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next(); |
| assertNotNull(bs); |
| assertNotNull(bs.getAcceptor()); |
| assertNotNull(bs.getAcceptor().getCacheClientNotifier()); |
| final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccn.getClientProxies().size() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for ccn.getClientProxies().size() > 0"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| Iterator iter_prox = ccn.getClientProxies().iterator(); |
| if (iter_prox.hasNext()) { |
| final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next(); |
| |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| Set keysMap = (Set) ccp.cils[interestListIndex] |
| .getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); |
| return keysMap != null; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for keys of interest to not be null"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| Set keysMap = (Set) ccp.cils[interestListIndex] |
| .getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID()); |
| assertNotNull(keysMap); |
| assertEquals(1, keysMap.size()); |
| assertFalse(keysMap.contains(k1)); |
| assertTrue(keysMap.contains(k2)); |
| } |
| } |
| |
| private void createCache(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| cache = CacheFactory.create(ds); |
| assertNotNull(cache); |
| } |
| |
| public static void createClientPoolCache(String testName, String host) throws Exception { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new HAInterestTestCase().createCache(props); |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| PoolImpl p; |
| try { |
| p = (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).addServer(host, PORT2) |
| .addServer(host, PORT3).setSubscriptionEnabled(true).setSubscriptionRedundancy(-1) |
| .setReadTimeout(10000).setPingInterval(1000) |
| // retryInterval should be more so that only registerInterste thread |
| // will initiate failover |
| // .setRetryInterval(20000) |
| .create("HAInterestBaseTestPool"); |
| } finally { |
| CacheServerTestUtil.enableShufflingOfEndpoints(); |
| } |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(true); |
| factory.setPoolName(p.getName()); |
| |
| cache.createRegion(REGION_NAME, factory.create()); |
| pool = p; |
| conn = pool.acquireConnection(); |
| assertNotNull(conn); |
| } |
| |
| public static void createClientPoolCacheWithSmallRetryInterval(String testName, String host) |
| throws Exception { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new HAInterestTestCase().createCache(props); |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| PoolImpl p; |
| try { |
| p = (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).addServer(host, PORT2) |
| .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(10000) |
| .setSocketBufferSize(32768).setMinConnections(6).setPingInterval(200) |
| // .setRetryInterval(200) |
| // retryAttempts 3 |
| .create("HAInterestBaseTestPool"); |
| } finally { |
| CacheServerTestUtil.enableShufflingOfEndpoints(); |
| } |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(true); |
| factory.setPoolName(p.getName()); |
| |
| cache.createRegion(REGION_NAME, factory.create()); |
| |
| pool = p; |
| conn = pool.acquireConnection(); |
| assertNotNull(conn); |
| } |
| |
| public static void createClientPoolCacheConnectionToSingleServer(String testName, String hostName) |
| throws Exception { |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new HAInterestTestCase().createCache(props); |
| PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(hostName, PORT1) |
| .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(10000) |
| // .setRetryInterval(20) |
| .create("HAInterestBaseTestPool"); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(true); |
| factory.setPoolName(p.getName()); |
| |
| cache.createRegion(REGION_NAME, factory.create()); |
| |
| pool = p; |
| conn = pool.acquireConnection(); |
| assertNotNull(conn); |
| } |
| |
| public static Integer createServerCache() throws Exception { |
| new HAInterestTestCase().createCache(new Properties()); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setEnableBridgeConflation(true); |
| factory.setMirrorType(MirrorType.KEYS_VALUES); |
| factory.setConcurrencyChecksEnabled(true); |
| cache.createRegion(REGION_NAME, factory.create()); |
| |
| CacheServer server = cache.addCacheServer(); |
| int port = getRandomAvailableTCPPort(); |
| server.setPort(port); |
| server.setMaximumTimeBetweenPings(180000); |
| // ensures updates to be sent instead of invalidations |
| server.setNotifyBySubscription(true); |
| server.start(); |
| return new Integer(server.getPort()); |
| } |
| |
| public static Integer createServerCacheWithLocalRegion() throws Exception { |
| new HAInterestTestCase().createCache(new Properties()); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(true); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(REGION_NAME, attrs); |
| |
| CacheServer server = cache.addCacheServer(); |
| int port = getRandomAvailableTCPPort(); |
| server.setPort(port); |
| // ensures updates to be sent instead of invalidations |
| server.setNotifyBySubscription(true); |
| server.setMaximumTimeBetweenPings(180000); |
| server.start(); |
| return new Integer(server.getPort()); |
| } |
| } |