| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache30; |
| |
| import static org.apache.geode.cache.Scope.LOCAL; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.dunit.Assert.assertEquals; |
| import static org.apache.geode.test.dunit.Assert.assertNotNull; |
| import static org.apache.geode.test.dunit.Assert.assertTrue; |
| import static org.apache.geode.test.dunit.Assert.fail; |
| import static org.apache.geode.test.dunit.Host.getHost; |
| import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; |
| import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; |
| |
| import java.io.IOException; |
| import java.util.Properties; |
| |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.CacheLoaderException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| import org.apache.geode.cache.client.ServerOperationException; |
| import org.apache.geode.cache.client.SubscriptionNotEnabledException; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| |
| /** |
| * Tests the client register interest |
| * |
| * @since GemFire 4.2.3 |
| */ |
| @Category({ClientSubscriptionTest.class}) |
| public class ClientRegisterInterestDUnitTest extends ClientServerTestCase { |
| |
| protected static int bridgeServerPort; |
| |
| @Override |
| public final void postTearDownCacheTestCase() throws Exception { |
| disconnectAllFromDS(); // cleans up cache server and client and lonerDS |
| } |
| |
| /** |
| * Tests for Bug 35381 Calling register interest if establishCallbackConnection is not set causes |
| * cache server NPE. |
| */ |
| @Test |
| public void testBug35381() throws Exception { |
| final Host host = Host.getHost(0); |
| final String name = this.getUniqueName(); |
| final int[] ports = new int[1]; // 1 server in this test |
| |
| final int whichVM = 0; |
| final VM vm = Host.getHost(0).getVM(whichVM); |
| vm.invoke(new CacheSerializableRunnable("Create cache server") { |
| @Override |
| public void run2() throws CacheException { |
| LogWriterUtils.getLogWriter().info("[testBug35381] Create BridgeServer"); |
| getSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| Region region = createRegion(name, factory.create()); |
| assertNotNull(region); |
| assertNotNull(getRootRegion().getSubregion(name)); |
| region.put("KEY-1", "VAL-1"); |
| |
| try { |
| bridgeServerPort = startBridgeServer(0); |
| } catch (IOException e) { |
| LogWriterUtils.getLogWriter().error("startBridgeServer threw IOException", e); |
| fail("startBridgeServer threw IOException ", e); |
| } |
| |
| assertTrue(bridgeServerPort != 0); |
| |
| LogWriterUtils.getLogWriter().info("[testBug35381] port=" + bridgeServerPort); |
| LogWriterUtils.getLogWriter().info("[testBug35381] serverMemberId=" + getMemberId()); |
| } |
| }); |
| ports[whichVM] = vm.invoke(() -> ClientRegisterInterestDUnitTest.getBridgeServerPort()); |
| assertTrue(ports[whichVM] != 0); |
| |
| LogWriterUtils.getLogWriter().info("[testBug35381] create bridge client"); |
| Properties config = new Properties(); |
| config.setProperty(MCAST_PORT, "0"); |
| config.setProperty(LOCATORS, ""); |
| getSystem(config); |
| getCache(); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| |
| LogWriterUtils.getLogWriter().info("[testBug35381] creating connection pool"); |
| boolean establishCallbackConnection = false; // SOURCE OF BUG 35381 |
| ClientServerTestCase.configureConnectionPool(factory, NetworkUtils.getServerHostName(host), |
| ports, establishCallbackConnection, -1, -1, null); |
| Region region = createRegion(name, factory.create()); |
| assertNotNull(getRootRegion().getSubregion(name)); |
| try { |
| region.registerInterest("KEY-1"); |
| fail( |
| "registerInterest failed to throw SubscriptionNotEnabledException with establishCallbackConnection set to false"); |
| } catch (SubscriptionNotEnabledException expected) { |
| } |
| } |
| |
| private static int getBridgeServerPort() { |
| return bridgeServerPort; |
| } |
| |
| /** |
| * Tests failover of register interest from client point of view. Related bugs include: |
| * |
| * <p> |
| * Bug 35654 "failed re-registration may never be detected and thus may never re-re-register" |
| * |
| * <p> |
| * Bug 35639 "registerInterest re-registration happens everytime a healthy server is detected" |
| * |
| * <p> |
| * Bug 35655 "a single failed re-registration causes all other pending re-registrations to be |
| * cancelled" |
| */ |
| @Ignore("TODO") |
| @Test |
| public void testRegisterInterestFailover() throws Exception { |
| // controller is bridge client |
| |
| final Host host = getHost(0); |
| final String name = this.getUniqueName(); |
| final String regionName1 = name + "-1"; |
| final String regionName2 = name + "-2"; |
| final String regionName3 = name + "-3"; |
| final String key1 = "KEY-" + regionName1 + "-1"; |
| final String key2 = "KEY-" + regionName1 + "-2"; |
| final String key3 = "KEY-" + regionName1 + "-3"; |
| final int[] ports = new int[3]; // 3 servers in this test |
| |
| // create first cache server with region for client... |
| final int firstServerIdx = 0; |
| final VM firstServerVM = getHost(0).getVM(firstServerIdx); |
| firstServerVM.invoke(new CacheSerializableRunnable("Create first cache server") { |
| @Override |
| public void run2() throws CacheException { |
| getLogWriter() |
| .info("[testRegisterInterestFailover] Create first cache server"); |
| getSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(LOCAL); |
| Region region1 = createRootRegion(regionName1, factory.create()); |
| Region region2 = createRootRegion(regionName2, factory.create()); |
| Region region3 = createRootRegion(regionName3, factory.create()); |
| region1.put(key1, "VAL-1"); |
| region2.put(key2, "VAL-1"); |
| region3.put(key3, "VAL-1"); |
| |
| try { |
| bridgeServerPort = startBridgeServer(0); |
| } catch (IOException e) { |
| getLogWriter().error("startBridgeServer threw IOException", e); |
| fail("startBridgeServer threw IOException ", e); |
| } |
| |
| assertTrue(bridgeServerPort != 0); |
| |
| getLogWriter() |
| .info("[testRegisterInterestFailover] " + "firstServer port=" + bridgeServerPort); |
| getLogWriter() |
| .info("[testRegisterInterestFailover] " + "firstServer memberId=" + getMemberId()); |
| } |
| }); |
| |
| // create second cache server missing region for client... |
| final int secondServerIdx = 1; |
| final VM secondServerVM = getHost(0).getVM(secondServerIdx); |
| secondServerVM.invoke(new CacheSerializableRunnable("Create second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| getLogWriter() |
| .info("[testRegisterInterestFailover] Create second cache server"); |
| getSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(LOCAL); |
| Region region1 = createRootRegion(regionName1, factory.create()); |
| Region region3 = createRootRegion(regionName3, factory.create()); |
| region1.put(key1, "VAL-2"); |
| region3.put(key3, "VAL-2"); |
| |
| try { |
| bridgeServerPort = startBridgeServer(0); |
| } catch (IOException e) { |
| getLogWriter().error("startBridgeServer threw IOException", e); |
| fail("startBridgeServer threw IOException ", e); |
| } |
| |
| assertTrue(bridgeServerPort != 0); |
| |
| getLogWriter() |
| .info("[testRegisterInterestFailover] " + "secondServer port=" + bridgeServerPort); |
| getLogWriter() |
| .info("[testRegisterInterestFailover] " + "secondServer memberId=" + getMemberId()); |
| } |
| }); |
| |
| // get the cache server ports... |
| ports[firstServerIdx] = |
| firstServerVM.invoke(() -> getBridgeServerPort()); |
| assertTrue(ports[firstServerIdx] != 0); |
| ports[secondServerIdx] = |
| secondServerVM.invoke(() -> getBridgeServerPort()); |
| assertTrue(ports[secondServerIdx] != 0); |
| assertTrue(ports[firstServerIdx] != ports[secondServerIdx]); |
| |
| // stop second and third servers |
| secondServerVM.invoke(new CacheSerializableRunnable("Stop second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| stopBridgeServers(getCache()); |
| } |
| }); |
| |
| // create the bridge client |
| getLogWriter().info("[testBug35654] create bridge client"); |
| Properties config = new Properties(); |
| config.setProperty(MCAST_PORT, "0"); |
| config.setProperty(LOCATORS, ""); |
| getSystem(config); |
| getCache(); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(LOCAL); |
| |
| getLogWriter().info("[testRegisterInterestFailover] creating connection pool"); |
| boolean establishCallbackConnection = true; |
| final PoolImpl p = (PoolImpl) configureConnectionPool(factory, |
| getServerHostName(host), ports, establishCallbackConnection, -1, -1, null); |
| |
| final Region region1 = createRootRegion(regionName1, factory.create()); |
| final Region region2 = createRootRegion(regionName2, factory.create()); |
| final Region region3 = createRootRegion(regionName3, factory.create()); |
| |
| assertTrue(region1.getInterestList().isEmpty()); |
| assertTrue(region2.getInterestList().isEmpty()); |
| assertTrue(region3.getInterestList().isEmpty()); |
| |
| region1.registerInterest(key1); |
| region2.registerInterest(key2); |
| region3.registerInterest(key3); |
| |
| assertTrue(region1.getInterestList().contains(key1)); |
| assertTrue(region2.getInterestList().contains(key2)); |
| assertTrue(region3.getInterestList().contains(key3)); |
| |
| assertTrue(region1.getInterestListRegex().isEmpty()); |
| assertTrue(region2.getInterestListRegex().isEmpty()); |
| assertTrue(region3.getInterestListRegex().isEmpty()); |
| |
| // get ConnectionProxy and wait until connected to first server |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return p.getPrimaryPort() != -1; |
| } |
| |
| @Override |
| public String description() { |
| return "primary port remained invalid"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals(ports[firstServerIdx], p.getPrimaryPort()); |
| |
| // assert intial values |
| assertEquals("VAL-1", region1.get(key1)); |
| assertEquals("VAL-1", region2.get(key2)); |
| assertEquals("VAL-1", region3.get(key3)); |
| |
| // do puts on server1 and make sure values come thru for all 3 registrations |
| firstServerVM.invoke(new CacheSerializableRunnable("Puts from first cache server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getCache().getRegion(regionName1); |
| region1.put(key1, "VAL-1-1"); |
| Region region2 = getCache().getRegion(regionName2); |
| region2.put(key2, "VAL-1-1"); |
| Region region3 = getCache().getRegion(regionName3); |
| region3.put(key3, "VAL-1-1"); |
| } |
| }); |
| |
| ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (!"VAL-1-1".equals(region1.get(key1)) || !"VAL-1-1".equals(region2.get(key2)) |
| || !"VAL-1-1".equals(region3.get(key3))) |
| return false; |
| return true; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("VAL-1-1", region1.get(key1)); |
| assertEquals("VAL-1-1", region2.get(key2)); |
| assertEquals("VAL-1-1", region3.get(key3)); |
| |
| // force failover to server 2 |
| secondServerVM.invoke(new CacheSerializableRunnable("Start second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| try { |
| startBridgeServer(ports[secondServerIdx]); |
| } catch (IOException e) { |
| getLogWriter().error("startBridgeServer threw IOException", e); |
| fail("startBridgeServer threw IOException ", e); |
| } |
| } |
| }); |
| |
| firstServerVM.invoke(new CacheSerializableRunnable("Stop first cache server") { |
| @Override |
| public void run2() throws CacheException { |
| stopBridgeServers(getCache()); |
| } |
| }); |
| |
| // wait for failover to second server |
| ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ports[secondServerIdx] == p.getPrimaryPort(); |
| } |
| |
| @Override |
| public String description() { |
| return "primary port never became " + ports[secondServerIdx]; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| |
| try { |
| assertEquals(null, region2.get(key2)); |
| fail("CacheLoaderException expected"); |
| } catch (CacheLoaderException e) { |
| } |
| |
| // region2 registration should be gone now |
| // do puts on server2 and make sure values come thru for only 2 registrations |
| secondServerVM.invoke(new CacheSerializableRunnable("Puts from second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(LOCAL); |
| createRootRegion(regionName2, factory.create()); |
| } |
| }); |
| |
| // assert that there is no actively registered interest on region2 |
| assertTrue(region2.getInterestList().isEmpty()); |
| assertTrue(region2.getInterestListRegex().isEmpty()); |
| |
| region2.put(key2, "VAL-0"); |
| |
| secondServerVM.invoke(new CacheSerializableRunnable("Put from second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getCache().getRegion(regionName1); |
| region1.put(key1, "VAL-2-2"); |
| Region region2 = getCache().getRegion(regionName2); |
| region2.put(key2, "VAL-2-1"); |
| Region region3 = getCache().getRegion(regionName3); |
| region3.put(key3, "VAL-2-2"); |
| } |
| }); |
| |
| // wait for updates to come thru |
| ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (!"VAL-2-2".equals(region1.get(key1)) || !"VAL-2-2".equals(region3.get(key3))) |
| return false; |
| return true; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("VAL-2-2", region1.get(key1)); |
| assertEquals("VAL-0", region2.get(key2)); |
| assertEquals("VAL-2-2", region3.get(key3)); |
| |
| // assert again that there is no actively registered interest on region2 |
| assertTrue(region2.getInterestList().isEmpty()); |
| |
| // register interest again on region2 and make |
| region2.registerInterest(key2); |
| assertEquals("VAL-2-1", region2.get(key2)); |
| |
| secondServerVM.invoke(new CacheSerializableRunnable("Put from second cache server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getCache().getRegion(regionName1); |
| region1.put(key1, "VAL-2-3"); |
| Region region2 = getCache().getRegion(regionName2); |
| region2.put(key2, "VAL-2-2"); |
| Region region3 = getCache().getRegion(regionName3); |
| region3.put(key3, "VAL-2-3"); |
| } |
| }); |
| |
| // wait for updates to come thru |
| ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (!"VAL-2-3".equals(region1.get(key1)) || !"VAL-2-2".equals(region2.get(key2)) |
| || !"VAL-2-3".equals(region3.get(key3))) |
| return false; |
| return true; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("VAL-2-3", region1.get(key1)); |
| assertEquals("VAL-2-2", region2.get(key2)); |
| assertEquals("VAL-2-3", region3.get(key3)); |
| |
| // assert public methods report actively registered interest on region2 |
| assertTrue(region2.getInterestList().contains(key2)); |
| } |
| |
| |
| |
| @Test |
| public void rejectAttemptToRegisterInterestInLonerSystem() throws Exception { |
| final String name = this.getUniqueName(); |
| final String regionName1 = name + "-1"; |
| |
| // create first cache server with region for client... |
| final int firstServerIdx = 1; |
| |
| final VM firstServerVM = Host.getHost(0).getVM(firstServerIdx); |
| firstServerVM.invoke(new CacheSerializableRunnable("Create first cache server") { |
| @Override |
| public void run2() throws CacheException { |
| Cache cache = new CacheFactory().set("mcast-port", "0").create(); |
| |
| try { |
| CacheServer bridge = cache.addCacheServer(); |
| bridge.setPort(0); |
| bridge.setMaxThreads(getMaxThreads()); |
| bridge.start(); |
| bridgeServerPort = bridge.getPort(); |
| } catch (IOException e) { |
| LogWriterUtils.getLogWriter().error("startBridgeServer threw IOException", e); |
| fail("startBridgeServer threw IOException ", e); |
| } |
| assertTrue(bridgeServerPort != 0); |
| |
| Region region1 = cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName1); |
| } |
| }); |
| |
| // get the cache server ports... |
| int port = firstServerVM.invoke(() -> ClientRegisterInterestDUnitTest.getBridgeServerPort()); |
| |
| try { |
| ClientCache clientCache = |
| new ClientCacheFactory().addPoolServer(firstServerVM.getHost().getHostName(), port) |
| .setPoolSubscriptionEnabled(true).create(); |
| Region region = clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) |
| .create(regionName1); |
| region.registerInterestRegex(".*"); |
| fail(); |
| } catch (ServerOperationException e) { |
| // expected |
| if (!e.getRootCause().getMessage().equals( |
| "Should not register interest for a partitioned region when mcast-port is 0 and no locator is present")) { |
| fail(); |
| } |
| } |
| } |
| |
| } |