| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.net.ConnectException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.mockito.Mockito; |
| |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.internal.Version; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; |
| import org.apache.geode.test.version.VersionManager; |
| |
| @Category({ClientServerTest.class, BackwardCompatibilityTest.class}) |
| @RunWith(Parameterized.class) |
| @Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) |
| public class ClientServerMiscBCDUnitTest extends ClientServerMiscDUnitTestBase { |
| @Parameterized.Parameters |
| public static Collection<String> data() { |
| List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); |
| if (result.size() < 1) { |
| throw new RuntimeException("No older versions of Geode were found to test against"); |
| } else { |
| System.out.println("running against these versions: " + result); |
| } |
| return result; |
| } |
| |
| public ClientServerMiscBCDUnitTest(String version) { |
| super(); |
| testVersion = version; |
| } |
| |
| @Override |
| void createClientCacheAndVerifyPingIntervalIsSet(String host, int port) throws Exception { |
| // this functionality was introduced in 1.5. If we let the test run in older |
| // clients it will throw a NoSuchMethodError |
| if (Version.CURRENT_ORDINAL >= 80 /* GEODE_1_5_0 */) { |
| super.createClientCacheAndVerifyPingIntervalIsSet(host, port); |
| } |
| } |
| |
| @Test |
| public void testSubscriptionWithCurrentServerAndOldClients() throws Exception { |
| // start server first |
| int serverPort = initServerCache(true); |
| VM client1 = Host.getHost(0).getVM(testVersion, 1); |
| VM client2 = Host.getHost(0).getVM(testVersion, 3); |
| String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); |
| client1.invoke("create client1 cache", () -> { |
| createClientCache(hostname, serverPort); |
| populateCache(); |
| registerInterest(); |
| }); |
| client2.invoke("create client2 cache", () -> { |
| Pool ignore = createClientCache(hostname, serverPort); |
| }); |
| |
| client2.invoke("putting data in client2", () -> putForClient()); |
| |
| // client1 will receive client2's updates asynchronously |
| client1.invoke(() -> { |
| Region r2 = getCache().getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| await().until(() -> verifier.eventReceived); |
| }); |
| |
| // client2's update should have included a memberID - GEODE-2954 |
| client1.invoke(() -> { |
| Region r2 = getCache().getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| assertFalse(verifier.memberIDNotReceived); |
| }); |
| } |
| |
| @Test |
| public void testSubscriptionWithMixedServersAndNewPeerFeed() throws Exception { |
| doTestSubscriptionWithMixedServersAndPeerFeed(VersionManager.CURRENT_VERSION, true); |
| } |
| |
| @Test |
| public void testSubscriptionWithMixedServersAndOldPeerFeed() throws Exception { |
| doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, true); |
| } |
| |
| @Test |
| public void testSubscriptionWithMixedServersAndOldClientFeed() throws Exception { |
| doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, false); |
| } |
| |
| private void doTestSubscriptionWithMixedServersAndPeerFeed(String version, |
| boolean usePeerForFeed) { |
| server1 = Host.getHost(0).getVM(testVersion, 2); |
| server2 = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 3); |
| VM server3 = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 4); |
| VM interestClient = Host.getHost(0).getVM(testVersion, 0); |
| VM feeder = Host.getHost(0).getVM(version, 1); |
| |
| // start servers first |
| int server1Port = initServerCache(true); |
| |
| int server2Port = initServerCache2(); |
| |
| int server3Port = server3.invoke(() -> createServerCache(true, getMaxThreads(), false)); |
| |
| System.out.println("old server is vm 2 and new server is vm 3"); |
| System.out |
| .println("old server port is " + server1Port + " and new server port is " + server2Port); |
| |
| String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); |
| interestClient.invoke("create interestClient cache", () -> { |
| createClientCache(hostname, 300000, false, server1Port, server2Port, server3Port); |
| populateCache(); |
| registerInterest(); |
| }); |
| |
| if (!usePeerForFeed) { |
| feeder.invoke("create client cache for feed", () -> { |
| Pool ignore = createClientCache(hostname, server1Port); |
| }); |
| } |
| feeder.invoke("putting data in feeder", () -> putForClient()); |
| |
| // interestClient will receive feeder's updates asynchronously |
| interestClient.invoke("verification 1", () -> { |
| Region r2 = getCache().getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| await().until(() -> verifier.eventReceived); |
| verifier.reset(); |
| }); |
| |
| server1.invoke("shutdown old server", () -> { |
| getCache().getDistributedSystem().disconnect(); |
| }); |
| |
| server2.invoke("wait for failover queue to drain", () -> { |
| CacheClientProxy proxy = |
| CacheClientNotifier.getInstance().getClientProxies().iterator().next(); |
| await() |
| .until(() -> proxy.getHARegionQueue().isEmpty()); |
| }); |
| |
| // the client should now get duplicate events from the current-version server |
| interestClient.invoke("verification 2", () -> { |
| Cache cache = getCache(); |
| Region r2 = cache.getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| assertFalse(verifier.eventReceived); // no duplicate events should have arrived |
| PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); |
| |
| Map seqMap = pool.getThreadIdToSequenceIdMap(); |
| assertEquals(3, seqMap.size()); // one for each server and one for the feed |
| verifier.reset(); |
| }); |
| |
| server2.invoke("shutdown new server", () -> { |
| getCache().getDistributedSystem().disconnect(); |
| }); |
| |
| server3.invoke("wait for failover queue to drain", () -> { |
| CacheClientProxy proxy = |
| CacheClientNotifier.getInstance().getClientProxies().iterator().next(); |
| await() |
| .until(() -> proxy.getHARegionQueue().isEmpty()); |
| }); |
| |
| // the client should now get duplicate events from the current-version server |
| interestClient.invoke("verification 3", () -> { |
| Cache cache = getCache(); |
| Region r2 = cache.getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| assertFalse(verifier.eventReceived); // no duplicate events should have arrived |
| PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); |
| |
| Map seqMap = pool.getThreadIdToSequenceIdMap(); |
| assertEquals(4, seqMap.size()); // one for each server and one for the feed |
| }); |
| } |
| |
| @Test |
| public void giiEventQueueFromOldToCurrentMemberShouldSucceed() { |
| giiEventQueueShouldSucceedWithMixedVersions(testVersion, VersionManager.CURRENT_VERSION); |
| } |
| |
| @Test |
| public void giiEventQueueFromCurrentToOldMemberShouldSucceed() { |
| final IgnoredException expectedEx = |
| IgnoredException.addIgnoredException(ConnectException.class.getName()); |
| giiEventQueueShouldSucceedWithMixedVersions(VersionManager.CURRENT_VERSION, testVersion); |
| expectedEx.remove(); |
| } |
| |
| public void giiEventQueueShouldSucceedWithMixedVersions(String server1Version, |
| String server2Version) { |
| VM interestClient = Host.getHost(0).getVM(testVersion, 0); |
| VM feeder = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, 1); |
| server1 = Host.getHost(0).getVM(server1Version, 2); |
| server2 = Host.getHost(0).getVM(server2Version, 3); |
| |
| // start servers first |
| int server1Port = initServerCache(true, server1, true); |
| int server2Port = initServerCache(true, server2, true); |
| server2.invoke(() -> { |
| getCache().getCacheServers().stream().forEach(CacheServer::stop); |
| }); |
| |
| |
| String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); |
| interestClient.invoke("create interestClient cache", () -> { |
| createClientCache(hostname, 300000, false, server1Port, server2Port); |
| registerInterest(); |
| registerCQ(); |
| }); |
| |
| feeder.invoke("putting data in feeder", () -> putForClient()); |
| |
| // Start server 2 |
| server2.invoke(() -> { |
| for (CacheServer server : getCache().getCacheServers()) { |
| server.start(); |
| } |
| }); |
| |
| // Make sure server 2 copies the queue |
| server2.invoke(() -> { |
| await().untilAsserted(() -> { |
| final Collection<CacheClientProxy> clientProxies = |
| CacheClientNotifier.getInstance().getClientProxies(); |
| assertFalse(clientProxies.isEmpty()); |
| CacheClientProxy proxy = clientProxies.iterator().next(); |
| assertFalse(proxy.getHARegionQueue().isEmpty()); |
| }); |
| }); |
| |
| // interestClient will receive feeder's updates asynchronously |
| interestClient.invoke("verification 1", () -> { |
| Region r2 = getCache().getRegion(REGION_NAME2); |
| MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); |
| await().until(() -> verifier.eventReceived); |
| verifier.reset(); |
| }); |
| |
| server1.invoke("shutdown old server", () -> { |
| getCache().getDistributedSystem().disconnect(); |
| }); |
| |
| server2.invoke("wait for failover queue to drain", () -> { |
| CacheClientProxy proxy = |
| CacheClientNotifier.getInstance().getClientProxies().iterator().next(); |
| await() |
| .until(() -> proxy.getHARegionQueue().isEmpty()); |
| }); |
| } |
| |
| public static void registerCQ() throws Exception { |
| Cache cache = new ClientServerMiscDUnitTestBase().getCache(); |
| Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME2); |
| assertNotNull(r); |
| CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); |
| cqAttributesFactory.addCqListener(Mockito.mock(CqListener.class)); |
| final CqQuery cq = cache.getQueryService().newCq("testCQ", "select * from " + r.getFullPath(), |
| cqAttributesFactory.create()); |
| cq.execute(); |
| } |
| |
| @Test |
| public void testDistributedMemberBytesWithCurrentServerAndOldClient() throws Exception { |
| // Start current version server |
| int serverPort = initServerCache(true); |
| |
| // Start old version client and do puts |
| VM client = Host.getHost(0).getVM(testVersion, 1); |
| String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); |
| client.invoke("create client cache", () -> { |
| createClientCache(hostname, serverPort); |
| populateCache(); |
| }); |
| |
| // Get client member id byte array on client |
| byte[] clientMembershipIdBytesOnClient = |
| client.invoke(() -> getClientMembershipIdBytesOnClient()); |
| |
| // Get client member id byte array on server |
| byte[] clientMembershipIdBytesOnServer = |
| server1.invoke(() -> getClientMembershipIdBytesOnServer()); |
| |
| // Verify member id bytes on client and server are equal |
| String complaint = "size on client=" + clientMembershipIdBytesOnClient.length |
| + "; size on server=" + clientMembershipIdBytesOnServer.length + "\nclient bytes=" |
| + Arrays.toString(clientMembershipIdBytesOnClient) + "\nserver bytes=" |
| + Arrays.toString(clientMembershipIdBytesOnServer); |
| assertTrue(complaint, |
| Arrays.equals(clientMembershipIdBytesOnClient, clientMembershipIdBytesOnServer)); |
| } |
| |
| private byte[] getClientMembershipIdBytesOnClient() { |
| DistributedSystem system = getCache().getDistributedSystem(); |
| byte[] result = |
| EventID.getMembershipId(new ClientProxyMembershipID(system.getDistributedMember())); |
| System.out.println("client ID bytes are " + Arrays.toString(result)); |
| return result; |
| } |
| |
| private byte[] getClientMembershipIdBytesOnServer() { |
| Set cpmIds = ClientHealthMonitor.getInstance().getClientHeartbeats().keySet(); |
| assertEquals(1, cpmIds.size()); |
| ClientProxyMembershipID cpmId = (ClientProxyMembershipID) cpmIds.iterator().next(); |
| System.out.println("client ID on server is " + cpmId.getDistributedMember()); |
| byte[] result = EventID.getMembershipId(cpmId); |
| System.out.println("client ID bytes are " + Arrays.toString(result)); |
| return result; |
| } |
| } |