| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Properties; |
| |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.DeltaTestImpl; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.AttributesMutator; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.InterestPolicy; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.SubscriptionAttributes; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesFactory; |
| import org.apache.geode.cache.query.CqEvent; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache.util.CqListenerAdapter; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.TestObjectWithIdentifier; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; |
| import org.apache.geode.test.junit.categories.ClientSubscriptionTest; |
| import org.apache.geode.test.junit.categories.SerializationTest; |
| |
| /** |
| * Test client to server flow for delta propogation |
| * |
| * @since GemFire 6.1 |
| */ |
| @Category({ClientSubscriptionTest.class, SerializationTest.class}) |
| public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase { |
| /* |
| * Test configuration one server one client also 2 server 2 client |
| */ |
| private static Cache cache = null; |
| |
| private static LogWriter logger = null; |
| |
| VM server = null; |
| VM server2 = null; |
| VM client = null; |
| VM client2 = null; |
| |
| private static final String KEY1 = "DELTA_KEY_1"; |
| |
| private static final String REGION_NAME = "ClientToServerDeltaDunitTest_region"; |
| |
| private static final int NO_PUT_OPERATION = 3; |
| |
| private static PoolImpl pool; |
| |
| private static Object[] putDelta = {"Anil", DeltaTestImpl.ERRONEOUS_STRING_FOR_FROM_DELTA, 100, |
| DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA}; |
| |
| private static int updates = 0; |
| |
| private static int cqUpdates = 0; |
| |
| private static int create = 0; |
| |
| private static Object firstUpdate = null; |
| |
| private static Object secondUpdate = null; |
| |
| private static Boolean error; |
| |
| private static boolean lastKeyReceived = false; |
| |
| private static Region region = null; |
| |
| private static CSDeltaTestImpl csDelta = null; |
| |
| public static String DELTA_KEY = "DELTA_KEY"; |
| |
| private static final String[] CQs = new String[] {"select * from " + SEPARATOR + REGION_NAME, |
| "select * from " + SEPARATOR + REGION_NAME + " where intVar = 0", |
| "select * from " + SEPARATOR + REGION_NAME + " where intVar > 0", |
| "select * from " + SEPARATOR + REGION_NAME + " where intVar < 0"}; |
| |
| public static String LAST_KEY = "LAST_KEY"; |
| |
| |
| @Rule |
| public DistributedRestoreSystemProperties restoreSystemProperties = |
| new DistributedRestoreSystemProperties(); |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| disconnectAllFromDS(); |
| final Host host = Host.getHost(0); |
| server = host.getVM(0); |
| server2 = host.getVM(1); |
| client = host.getVM(2); |
| client2 = host.getVM(3); |
| } |
| |
| @Override |
| public final void preTearDown() throws Exception { |
| // reset all flags |
| DeltaTestImpl.resetDeltaInvokationCounters(); |
| server.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters()); |
| server2.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters()); |
| client.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters()); |
| client2.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters()); |
| // close the clients first |
| client.invoke(() -> ClientToServerDeltaDUnitTest.closeCache()); |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.closeCache()); |
| // then close the servers |
| server.invoke(() -> ClientToServerDeltaDUnitTest.closeCache()); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.closeCache()); |
| } |
| |
| public void initialise(Boolean cq) { |
| initialise(Boolean.TRUE/* clone */, null, cq, Boolean.TRUE/* RI */, |
| Boolean.TRUE/* enable delta */); |
| } |
| |
| public void initialise(Boolean clone, String[] queries, Boolean cq, Boolean RI, |
| Boolean enableDelta) { |
| Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest |
| .createServerCache(Boolean.TRUE, Boolean.FALSE, clone, enableDelta))).intValue(); |
| |
| Integer PORT2 = ((Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest |
| .createServerCache(Boolean.TRUE, Boolean.FALSE, clone, enableDelta))).intValue(); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.FALSE, Boolean.FALSE)); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server2.getHost()), new Integer(PORT2), Boolean.TRUE, |
| Boolean.FALSE, cq, queries, RI)); |
| } |
| |
| // Same as initialise() except listener flag is false. |
| public void initialise2(Boolean clone, String[] queries, Boolean cq, Boolean RI, |
| Boolean enableDelta) { |
| Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest |
| .createServerCache(Boolean.FALSE, Boolean.FALSE, clone, enableDelta))).intValue(); |
| |
| Integer PORT2 = ((Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest |
| .createServerCache(Boolean.FALSE, Boolean.FALSE, clone, enableDelta))).intValue(); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.FALSE, Boolean.FALSE)); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server2.getHost()), new Integer(PORT2), Boolean.TRUE, |
| Boolean.FALSE, cq, queries, RI)); |
| } |
| |
| /** |
| * This test does the following for single key (<b>failure of fromDelta- resending of full |
| * object</b>):<br> |
| * 1)Verifies that we donot loose attributes updates when delta fails<br> |
| */ |
| @Test |
| public void testSendingofFullDeltaObjectsWhenFromDeltaFails() { |
| initialise(false); |
| // set expected value on server |
| server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[1], |
| (Integer) putDelta[2])); |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putWithFromDeltaERR(KEY1)); |
| |
| assertTrue("to Delta Propagation feature NOT used.", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue()); |
| assertTrue("from Delta Propagation feature NOT used.", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue()); |
| |
| assertFalse("Delta Propagation toDeltaFailed", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue()); |
| assertTrue("Delta Propagation fromDelta not Failed", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure())).booleanValue()); |
| |
| boolean err = |
| ((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue(); |
| assertFalse("validation fails", err); |
| } |
| |
| /** |
| * This test does the following for single key</br> |
| * 1)Verifies that we donot loose attributes updates when delta fails<br> |
| */ |
| @Test |
| public void testPutForDeltaObjects() { |
| initialise(false); |
| // set expected value on server |
| server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0], |
| (Integer) putDelta[2])); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.put(KEY1)); |
| |
| assertTrue("to Delta Propagation feature NOT used.", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue()); |
| assertTrue("from Delta Propagation feature NOT used.", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue()); |
| assertFalse("Delta Propagation toDeltaFailed", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue()); |
| assertFalse("Delta Propagation fromDeltaFailed", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure())).booleanValue()); |
| boolean err = |
| ((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue(); |
| assertFalse("validation fails", err); |
| } |
| |
| /** |
| * This test does the following for single key (<b> full cycle</b>):<br> |
| * 1)Verifies that client-server, peer-peer and server-client processing delta<br> |
| */ |
| @Test |
| public void testClientToClientDeltaPropagation() throws Exception { |
| initialise(false); |
| // set expected value on s1,s1 and c2 |
| server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0], |
| (Integer) putDelta[2])); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0], |
| (Integer) putDelta[2])); |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0], |
| (Integer) putDelta[2])); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1)); |
| |
| Thread.sleep(5000); |
| |
| assertTrue("To Delta Propagation feature NOT used.", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue()); |
| assertTrue("From Delta Propagation feature NOT used.", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue()); |
| // toDelta() should not be invoked |
| assertFalse("To Delta Propagation feature used.", |
| ((Boolean) server.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue()); |
| assertTrue("From Delta Propagation feature NOT used.", |
| ((Boolean) server2.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue()); |
| // toDelta() should not be invoked |
| assertFalse("To Delta Propagation feature used.", |
| ((Boolean) server2.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue()); |
| assertTrue("from Delta Propagation feature NOT used.", |
| ((Boolean) client2.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue()); |
| |
| boolean err = |
| ((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue(); |
| err = ((Boolean) server2.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue(); |
| err = ((Boolean) client2.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue(); |
| assertFalse("validation fails", err); |
| } |
| |
| @Test |
| public void testClientDeltaPropogationPutFetchesTheLatestValueWhenClientVersionIsOlder() |
| throws Exception { |
| // client did not register interest |
| Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest |
| .createServerCache(Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.TRUE))).intValue(); |
| |
| ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.FALSE, Boolean.FALSE, null, Boolean.FALSE); |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| r.put(KEY1, val); |
| |
| server.invoke(() -> { |
| Region region = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val1 = (DeltaTestImpl) region.get(KEY1); |
| val1.NEED_TO_RESET_T0_DELTA = false; |
| val1.setIntVar(1); |
| region.put(KEY1, val1); |
| }); |
| |
| DeltaTestImpl val2 = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val2.setStr("1"); |
| val2.NEED_TO_RESET_T0_DELTA = false; |
| r.put(KEY1, val2); |
| |
| server.invoke(() -> { |
| Region region = cache.getRegion(REGION_NAME); |
| assertThat((DeltaTestImpl) region.get(KEY1)).isNotNull(); |
| }); |
| |
| DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| |
| server.invoke(() -> { |
| Region region = cache.getRegion(REGION_NAME); |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat((DeltaTestImpl) region.get(KEY1)).isEqualTo(expected)); |
| }); |
| |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected)); |
| } |
| |
| @Test |
| public void clientDeltaPutFetchesTheLatestVersionIfNotYetReceivedQueuedEvent() { |
| server.invoke(() -> setSlowStartForTesting()); |
| server2.invoke(() -> setSlowStartForTesting()); |
| initialise(false); |
| |
| DeltaTestImpl original = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| client.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| original.NEED_TO_RESET_T0_DELTA = false; |
| r.put(KEY1, original); |
| }); |
| |
| client2.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| assertThat(r.get(KEY1)).isEqualTo(original); |
| }); |
| |
| client.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1); |
| assertThat(val).isEqualTo(original); |
| val.NEED_TO_RESET_T0_DELTA = false; |
| val.setIntVar(1); |
| r.put(KEY1, val); |
| }); |
| |
| client2.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1); |
| // delta update should not arrive yet due to slow dispatcher |
| assertThat(val).isEqualTo(original); |
| val.NEED_TO_RESET_T0_DELTA = false; |
| val.setStr("1"); |
| r.put(KEY1, val); |
| Object o = r.get(KEY1); |
| logger.info("object is " + o); |
| }); |
| |
| server.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| r.get(KEY1); |
| }); |
| |
| server2.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| r.get(KEY1); |
| }); |
| |
| DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| |
| client.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected)); |
| }); |
| |
| client2.invoke(() -> { |
| Region r = cache.getRegion(REGION_NAME); |
| GeodeAwaitility.await() |
| .untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected)); |
| }); |
| } |
| |
| private void setSlowStartForTesting() { |
| CacheClientProxy.isSlowStartForTesting = true; |
| System.setProperty("slowStartTimeForTesting", "5000"); |
| } |
| |
| private static void putDeltaForCQ(String key, Integer numOfPuts, Integer[] cqIndices, |
| Boolean[] satisfyQuery) { |
| Region region = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = null; |
| for (int j = 0; j < numOfPuts; j++) { |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| for (int i = 0; i < cqIndices.length; i++) { |
| switch (i) { |
| case 0: |
| val.setStr("CASE_0"); |
| // select * |
| break; |
| case 1: |
| val.setStr("CASE_1"); |
| // select where intVar = 0 |
| if (satisfyQuery[i]) { |
| val.setIntVar(0); |
| } else { |
| val.setIntVar(100); |
| } |
| break; |
| case 2: |
| val.setStr("CASE_2"); |
| // select where intVar > 0 |
| if (satisfyQuery[i]) { |
| val.setIntVar(100); |
| } else { |
| val.setIntVar(-100); |
| } |
| break; |
| case 3: |
| val.setStr("CASE_3"); |
| // select where intVar < 0 |
| if (satisfyQuery[i]) { |
| val.setIntVar(-100); |
| } else { |
| val.setIntVar(100); |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| region.put(key, val); |
| } |
| } |
| |
| /* |
| * put delta ; not previous deltas |
| */ |
| private static void putDelta(String key) { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = null; |
| for (int i = 0; i < NO_PUT_OPERATION; i++) { |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| switch (i) { |
| case 1: |
| val.setStr((String) putDelta[0]); |
| break; |
| case 2: |
| val.setIntVar(((Integer) putDelta[2]).intValue()); |
| break; |
| } |
| r.put(key, val); |
| } |
| } |
| |
| private static void putLastKey() { |
| Region r = cache.getRegion(REGION_NAME); |
| r.put(LAST_KEY, LAST_KEY); |
| } |
| |
| private static void putLastKeyWithDelta() { |
| Region r = cache.getRegion(REGION_NAME); |
| r.put(LAST_KEY, new DeltaTestImpl()); |
| } |
| |
| private static void setFirstSecondUpdate(Object first, Object second) { |
| firstUpdate = first; |
| secondUpdate = second; |
| } |
| |
| private static Boolean getError() { |
| return error; |
| } |
| |
| /* |
| * put delta full cycle |
| */ |
| private static void put(String key) { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = null; |
| for (int i = 0; i < NO_PUT_OPERATION; i++) { |
| switch (i) { |
| case 0: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| break; |
| case 1: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val.setStr((String) putDelta[0]); |
| break; |
| case 2: |
| val = new DeltaTestImpl(0, (String) putDelta[1], new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val.setIntVar(((Integer) putDelta[2]).intValue()); |
| break; |
| } |
| r.put(key, val); |
| } |
| } |
| |
| /* |
| * put delta with some times fromDelta fails to apply client sends back full object |
| */ |
| private static void putWithFromDeltaERR(String key) { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = null; |
| for (int i = 0; i < NO_PUT_OPERATION; i++) { |
| switch (i) { |
| case 0: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| break; |
| case 1: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val.setStr((String) putDelta[1]); |
| break; |
| case 2: |
| val = new DeltaTestImpl(0, (String) putDelta[1], new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val.setIntVar(((Integer) putDelta[2]).intValue()); |
| break; |
| } |
| r.put(key, val); |
| } |
| } |
| |
| /* |
| * put delta with some times toDelta fails to apply; client sends back full object |
| */ |
| private static void putWithTODeltaERR(String key) { |
| Region r = cache.getRegion(REGION_NAME); |
| DeltaTestImpl val = null; |
| for (int i = 0; i < NO_PUT_OPERATION; i++) { |
| switch (i) { |
| case 0: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| break; |
| case 1: |
| val = new DeltaTestImpl(0, "0", new Double(0), new byte[0], |
| new TestObjectWithIdentifier("0", 0)); |
| val.setIntVar(((Integer) putDelta[3]).intValue()); |
| break; |
| case 2: |
| val = new DeltaTestImpl(((Integer) putDelta[3]).intValue(), "0", new Double(0), |
| new byte[0], new TestObjectWithIdentifier("0", 0)); |
| val.setStr((String) putDelta[0]); |
| break; |
| } |
| r.put(key, val); |
| } |
| } |
| |
| public static Integer createServerCache(Boolean attachListener, Boolean isEmpty) |
| throws Exception { |
| return createServerCache(attachListener, isEmpty, Boolean.TRUE/* clone */, |
| Boolean.TRUE/* enable delta */); |
| } |
| |
| /* |
| * create server cache |
| */ |
| public static Integer createServerCache(Boolean attachListener, Boolean isEmpty, Boolean clone, |
| Boolean enableDelta) throws Exception { |
| // for validation |
| updates = 0; |
| create = 0; |
| firstUpdate = null; |
| secondUpdate = null; |
| error = false; |
| Properties props = new Properties(); |
| props.setProperty(DELTA_PROPAGATION, enableDelta.toString()); |
| new ClientToServerDeltaDUnitTest().createCache(props); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(true); |
| if (isEmpty) { |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| |
| } else { |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| } |
| factory.setCloningEnabled(clone); |
| RegionAttributes attrs = factory.create(); |
| region = cache.createRegion(REGION_NAME, attrs); |
| |
| AttributesMutator am = region.getAttributesMutator(); |
| if (attachListener) { |
| am.addCacheListener(new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| create++; |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| switch (updates) { |
| case 0: |
| // first delta |
| validateUpdates(event, firstUpdate, "FIRST"); |
| updates++; |
| break; |
| case 1: |
| // combine delta |
| validateUpdates(event, firstUpdate, "FIRST"); |
| validateUpdates(event, secondUpdate, "SECOND"); |
| updates++; |
| break; |
| default: |
| break; |
| } |
| } |
| }); |
| } else if (!isEmpty) { |
| am.addCacheListener(new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| switch (create) { |
| case 1: |
| validateUpdates(event, firstUpdate, "FIRST"); |
| create++; |
| break; |
| case 2: |
| validateUpdates(event, secondUpdate, "SECOND"); |
| create++; |
| break; |
| default: |
| create++; |
| break; |
| } |
| } |
| /* |
| * public void afterUpdate(EntryEvent event) { updates++; } |
| */ |
| }); |
| |
| } |
| |
| CacheServer server = cache.addCacheServer(); |
| int port = getRandomAvailableTCPPort(); |
| server.setPort(port); |
| // ensures updates to be sent instead of invalidations |
| server.setNotifyBySubscription(true); |
| server.start(); |
| return new Integer(server.getPort()); |
| } |
| |
| public static void createClientCache(String host, Integer port, Boolean attachListener, |
| Boolean isEmpty, Boolean isCq) throws Exception { |
| createClientCache(host, port, attachListener, isEmpty, isCq, new String[0], true); |
| } |
| |
| public static void createClientCache(String host, Integer port, Boolean attachListener, |
| Boolean isEmpty, Boolean isCq, String[] cqQueryString) throws Exception { |
| createClientCache(host, port, attachListener, isEmpty, isCq, cqQueryString, true); |
| } |
| |
| public static void createClientCache(String host, Integer port, Boolean attachListener, |
| Boolean isEmpty, Boolean isCq, String[] cqQueryString, Boolean registerInterestAll) |
| throws Exception { |
| createClientCache(host, port, attachListener, isEmpty, isCq, cqQueryString, registerInterestAll, |
| true); |
| } |
| |
| /* |
| * create client cache |
| */ |
| public static void createClientCache(String host, Integer port, Boolean attachListener, |
| Boolean isEmpty, Boolean isCq, String[] cqQueryString, Boolean registerInterestAll, |
| Boolean enableSubscription) throws Exception { |
| updates = 0; |
| create = 0; |
| firstUpdate = null; |
| secondUpdate = null; |
| error = false; |
| lastKeyReceived = false; |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| new ClientToServerDeltaDUnitTest().createCache(props); |
| pool = (PoolImpl) PoolManager.createFactory().addServer(host, port.intValue()) |
| .setMinConnections(2) |
| .setSubscriptionEnabled(enableSubscription).setSubscriptionRedundancy(0) |
| .setReadTimeout(10000).setPingInterval(1000).setSocketBufferSize(32768) |
| .create("ClientToServerDeltaDunitTestPool"); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(true); |
| if (isEmpty) { |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| } else { |
| factory.setDataPolicy(DataPolicy.NORMAL); |
| } |
| factory.setPoolName(pool.getName()); |
| factory.setCloningEnabled(false); |
| |
| // region with empty data policy |
| RegionAttributes attrs = factory.create(); |
| region = cache.createRegion(REGION_NAME, attrs); |
| if (attachListener) { |
| region.getAttributesMutator().addCacheListener(new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| create++; |
| if (LAST_KEY.equals(event.getKey())) { |
| lastKeyReceived = true; |
| } ; |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| switch (updates) { |
| case 0: |
| // first delta |
| validateUpdates(event, firstUpdate, "FIRST"); |
| updates++; |
| break; |
| case 1: |
| // combine delta |
| validateUpdates(event, firstUpdate, "FIRST"); |
| validateUpdates(event, secondUpdate, "SECOND"); |
| updates++; |
| break; |
| default: |
| break; |
| } |
| } |
| }); |
| } |
| if (registerInterestAll) { |
| region.registerInterest("ALL_KEYS"); |
| } |
| if (isCq) { |
| CqAttributesFactory cqf = new CqAttributesFactory(); |
| CqListenerAdapter cqlist = new CqListenerAdapter() { |
| @Override |
| public void onEvent(CqEvent cqEvent) { |
| Object key = cqEvent.getKey(); |
| if (LAST_KEY.equals(key)) { |
| lastKeyReceived = true; |
| } |
| logger.fine( |
| "CQ event received for (key, value): (" + key + ", " + cqEvent.getNewValue() + ")"); |
| } |
| |
| @Override |
| public void onError(CqEvent cqEvent) { |
| logger.fine("CQ error received for key: " + cqEvent.getKey()); |
| } |
| }; |
| cqf.addCqListener(cqlist); |
| CqAttributes cqa = cqf.create(); |
| for (int i = 0; i < cqQueryString.length; i++) { |
| CqQuery cq = cache.getQueryService().newCq("Delta_Query_" + i, cqQueryString[i], cqa); |
| cq.execute(); |
| } |
| } |
| } |
| |
| /* |
| * create cache with properties |
| */ |
| private void createCache(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| cache = CacheFactory.create(ds); |
| assertNotNull(cache); |
| logger = cache.getLogger(); |
| } |
| |
| // to validate updates in listener |
| private static void validateUpdates(EntryEvent event, Object obj, String str) { |
| if (obj instanceof String) |
| assertTrue(str + " update missed ", |
| ((DeltaTestImpl) event.getNewValue()).getStr().equals((String) obj)); |
| else if (obj instanceof Integer) |
| assertTrue(str + " update missed ", |
| ((DeltaTestImpl) event.getNewValue()).getIntVar() == (Integer) obj); |
| else |
| error = true; |
| } |
| |
| /* |
| * close cache |
| */ |
| public static void closeCache() { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| lastKeyReceived = false; |
| } |
| } |
| |
| /** |
| * This test does the following for single key:<br> |
| * 1)Verifies that cacheless client calls toDelta <br> |
| */ |
| @Test |
| public void testEmptyClientAsFeederToServer() { |
| Integer PORT1 = ((Integer) server |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE))) |
| .intValue(); |
| |
| server2 |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.TRUE, Boolean.FALSE)); |
| |
| /* |
| * server2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate( putDelta[0], |
| * (Integer)putDelta[2] )); |
| */ |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2))); |
| server.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter()); |
| server.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0))); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter()); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0))); |
| } |
| |
| /** |
| * This test does the following for single key:<br> |
| * 1)Verifies that from delta should not called on server with empty data policy just by passed |
| * delta to data store<br> |
| */ |
| @Test |
| public void testEmptyServerAsFeederToPeer() { |
| Integer PORT1 = ((Integer) server |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE))) |
| .intValue(); |
| |
| server2 |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.TRUE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2))); |
| |
| server.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaFeatureNotUsed()); |
| |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0))); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter()); |
| } |
| |
| /** |
| * This test does verifies that server with empty data policy sends deltas to the client which can |
| * handle deltas. Server sends full values which can not handle deltas. |
| */ |
| @Test |
| public void testClientsConnectedToEmptyServer() { |
| Integer PORT1 = ((Integer) server |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE))) |
| .intValue(); |
| |
| server2 |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.TRUE, |
| Boolean.TRUE, Boolean.FALSE)); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.TRUE, |
| Boolean.FALSE, Boolean.FALSE)); |
| |
| int deltaSent = |
| (Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest.putsWhichReturnsDeltaSent()); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey()); |
| client.invoke(() -> ClientToServerDeltaDUnitTest.checkForDelta()); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey()); |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaInvoked(new Integer(deltaSent))); |
| } |
| |
| /** |
| * This test does the following for single key:<br> |
| * 1)Verifies that To delta called on client should be equal to fromDeltaCounter on datastore <br> |
| */ |
| @Test |
| public void testClientNonEmptyEmptyServerAsFeederToPeer() { |
| Integer PORT1 = ((Integer) server |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE))) |
| .intValue(); |
| |
| server2 |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache( |
| NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE, |
| Boolean.FALSE, Boolean.FALSE)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1)); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2))); |
| |
| server.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaFeatureNotUsed()); |
| |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0))); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter()); |
| } |
| |
| /** |
| * This test has client1 connected to server1 and client2 connected to server2. client2 has a CQ |
| * registered on the server2. client1 creates a key on server1 which satisfies the CQ and it then |
| * updates the same key with a value so that it sould not satisfy the CQ. The cloning is set to |
| * false. The test ensures that the client2 gets the update event. This implies that server2 |
| * clones the old value prior to applying delta received from server1 and then processes the CQ. |
| */ |
| @Test |
| public void testC2CDeltaPropagationWithCQ() throws Exception { |
| initialise(false/* clone */, new String[] {CQs[1]}, true/* CQ */, true/* RI */, |
| true/* enable delta */); |
| client.invoke(ClientToServerDeltaDUnitTest.class, "createKeys", |
| new Object[] {new String[] {KEY1}}); |
| client.invoke(ClientToServerDeltaDUnitTest.class, "putDeltaForCQ", new Object[] {KEY1, |
| new Integer(1), new Integer[] {1}, new Boolean[] {false, true, false, false}}); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putLastKeyWithDelta()); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey()); |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.verifyDeltaReceived()); |
| } |
| |
| /** |
| * This test ensures that a server sends delta bytes to a client even if that client is not |
| * interested in that event but is getting the event only because the event satisfies a CQ which |
| * the client has registered with the server. |
| */ |
| @Test |
| public void testC2CDeltaPropagationWithCQWithoutRI() throws Exception { |
| initialise(false/* clone */, new String[] {CQs[1]}, true/* CQ */, false/* RI */, |
| true/* enable delta */); |
| |
| client.invoke(ClientToServerDeltaDUnitTest.class, "createKeys", |
| new Object[] {new String[] {KEY1}}); |
| client.invoke(ClientToServerDeltaDUnitTest.class, "putDeltaForCQ", new Object[] {KEY1, |
| new Integer(1), new Integer[] {1}, new Boolean[] {false, true, false, false}}); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putLastKeyWithDelta()); |
| |
| client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey()); |
| server2.invoke(() -> ClientToServerDeltaDUnitTest.verifyDeltaSent(Integer.valueOf(1))); |
| } |
| |
| @Test |
| public void testClientSendsFullValueToServerWhenDeltaOffAtServer() { |
| initialise2(false/* clone */, new String[] {CQs[1]}, false/* CQ */, true/* RI */, |
| false/* enable delta */); |
| // set expected value on server |
| server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0], |
| (Integer) putDelta[2])); |
| |
| client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(DELTA_KEY)); |
| |
| assertFalse("Delta Propagation feature used at client.", |
| (Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())); |
| assertFalse("Delta Propagation feature used at server.", |
| (Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())); |
| assertFalse( |
| "Failures at client while calculating delta. But delta-propagation is false at server.", |
| ((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue()); |
| assertFalse( |
| "Failures at server while applying delta. But delta-propagation is false at server.", |
| (Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure())); |
| } |
| |
| @Test |
| public void testC2SDeltaPropagationWithOldValueInvalidatedAtServer() throws Exception { |
| String key = "DELTA_KEY"; |
| Integer port1 = (Integer) server |
| .invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(false, false, false, true)); |
| createClientCache("localhost", port1, false, false, false, null, false, false); |
| |
| LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME); |
| region.put(key, new DeltaTestImpl()); |
| |
| server.invoke(() -> ClientToServerDeltaDUnitTest.doInvalidate(key)); |
| DeltaTestImpl value = new DeltaTestImpl(); |
| value.setStr("UPDATED_VALUE"); |
| region.put(key, value); |
| |
| assertTrue(region.getCachePerfStats().getDeltasSent() == 1); |
| assertTrue(region.getCachePerfStats().getDeltaFullValuesSent() == 1); |
| } |
| |
| public static void doInvalidate(String key) { |
| Region region = cache.getRegion(REGION_NAME); |
| region.invalidate(key); |
| } |
| |
| public static void verifyDeltaReceived() { |
| assertTrue( |
| "Expected 1 fromDelta() invokation but was " + DeltaTestImpl.getFromDeltaInvokations(), |
| DeltaTestImpl.getFromDeltaInvokations() == 1); |
| } |
| |
| public static void verifyDeltaSent(Integer deltas) { |
| CacheClientNotifier ccn = ((CacheServerImpl) cache.getCacheServers().toArray()[0]).getAcceptor() |
| .getCacheClientNotifier(); |
| |
| int numOfDeltasSent = ((CacheClientProxy) ccn.getClientProxies().toArray()[0]).getStatistics() |
| .getDeltaMessagesSent(); |
| assertTrue("Expected " + deltas + " deltas to be sent but " + numOfDeltasSent + " were sent.", |
| numOfDeltasSent == deltas); |
| } |
| |
| public static void checkFromdeltaCounter() { |
| assertTrue( |
| "FromDelta counters do not match, expected: " + (NO_PUT_OPERATION - 1) + ", actual: " |
| + DeltaTestImpl.getFromDeltaInvokations(), |
| DeltaTestImpl.getFromDeltaInvokations() >= (NO_PUT_OPERATION - 1)); |
| } |
| |
| public static void checkTodeltaCounter(Integer count) { |
| assertTrue( |
| "ToDelta counters do not match, expected: " + count.intValue() + ", actual: " |
| + DeltaTestImpl.getToDeltaInvokations(), |
| DeltaTestImpl.getToDeltaInvokations() >= count.intValue()); |
| } |
| |
| public static void checkDeltaFeatureNotUsed() { |
| assertTrue("Delta Propagation feature used.", !(DeltaTestImpl.deltaFeatureUsed())); |
| } |
| |
| public static void createKeys(String[] keys) { |
| Region region = cache.getRegion(REGION_NAME); |
| for (int i = 0; i < keys.length; i++) { |
| region.create(keys[i], new DeltaTestImpl()); |
| } |
| } |
| |
| public static int putsWhichReturnsDeltaSent() throws Exception { |
| csDelta = new CSDeltaTestImpl(); |
| region.put(DELTA_KEY, csDelta); |
| for (int i = 0; i < NO_PUT_OPERATION; i++) { |
| csDelta.setIntVar(i); |
| region.put(DELTA_KEY, csDelta); |
| } |
| region.put(LAST_KEY, ""); |
| |
| return csDelta.deltaSent; |
| } |
| |
| public static void checkDeltaInvoked(Integer deltaSent) { |
| assertTrue( |
| "Delta applied :" + ((CSDeltaTestImpl) region.get(DELTA_KEY)).getDeltaApplied() |
| + "\n Delta sent :" + deltaSent, |
| ((CSDeltaTestImpl) region.get(DELTA_KEY)).getDeltaApplied() == deltaSent); |
| } |
| |
| public static void checkForDelta() { |
| assertTrue("Delta sent to EMPTY data policy region", |
| DeltaTestImpl.getFromDeltaInvokations() == 0); |
| } |
| |
| public static void waitForLastKey() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ClientToServerDeltaDUnitTest.lastKeyReceived; |
| } |
| |
| @Override |
| public String description() { |
| return "Last key NOT received."; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| static class CSDeltaTestImpl extends DeltaTestImpl { |
| |
| int deltaSent = 0; |
| int deltaApplied = 0; |
| |
| public CSDeltaTestImpl() {} |
| |
| @Override |
| public void toDelta(DataOutput out) throws IOException { |
| super.toDelta(out); |
| deltaSent++; |
| } |
| |
| @Override |
| public void fromDelta(DataInput in) throws IOException { |
| super.fromDelta(in); |
| deltaApplied++; |
| } |
| |
| public int getDeltaSent() { |
| return deltaSent; |
| } |
| |
| public int getDeltaApplied() { |
| return deltaApplied; |
| } |
| |
| @Override |
| public String toString() { |
| return "CSDeltaTestImpl[deltaApplied=" + deltaApplied + "]" + super.toString(); |
| } |
| } |
| |
| } |