| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.distributed.ConfigurationProperties.CONFLATE_EVENTS; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; |
| import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT; |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.internal.cache.CacheServerImpl.generateNameForClientMsgsRegion; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.util.Properties; |
| |
| import org.junit.Test; |
| |
| import org.apache.geode.DeltaTestImpl; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.DiskStoreFactory; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EvictionAction; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.ExpirationAttributes; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionEvent; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolFactory; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache30.ClientServerTestCase; |
| import org.apache.geode.compression.Compressor; |
| import org.apache.geode.compression.SnappyCompressor; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.AvailablePort; |
| import org.apache.geode.internal.cache.eviction.EvictionController; |
| import org.apache.geode.internal.cache.ha.HARegionQueue; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; |
| import org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTestHelper; |
| import org.apache.geode.internal.tcp.ConnectionTable; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.SerializableCallableIF; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| |
| /** |
| * @since GemFire 6.1 |
| */ |
| |
| public class DeltaPropagationDUnitTest extends JUnit4DistributedTestCase { |
| |
| private final Compressor compressor = SnappyCompressor.getDefaultInstance(); |
| |
| protected static Cache cache = null; |
| |
| protected VM vm0 = null; |
| |
| protected VM vm1 = null; |
| |
| protected VM vm2 = null; |
| |
| protected VM vm3 = null; |
| |
| private int PORT1; |
| |
| private int PORT2; |
| |
| private static final String regionName = DeltaPropagationDUnitTest.class.getSimpleName(); |
| |
| private static LogWriter logger = null; |
| |
| private static final int EVENTS_SIZE = 6; |
| |
| private static boolean lastKeyReceived = false; |
| |
| private boolean markerReceived = false; |
| |
| private int numOfCreates; |
| |
| private int numOfUpdates; |
| |
| private static int numOfInvalidates; |
| |
| private static int numOfDestroys; |
| |
| private static DeltaTestImpl[] deltaPut = new DeltaTestImpl[EVENTS_SIZE]; |
| |
| private boolean areListenerResultsValid = true; |
| |
| private boolean closeCache = false; |
| |
| private StringBuffer listenerError = new StringBuffer(""); |
| |
| private static final String DELTA_KEY = "DELTA_KEY"; |
| |
| private static final String LAST_KEY = "LAST_KEY"; |
| |
| private static final int NO_LISTENER = 0; |
| |
| private static final int CLIENT_LISTENER = 1; |
| |
| private static final int SERVER_LISTENER = 2; |
| |
| private static final int C2S2S_SERVER_LISTENER = 3; |
| |
| private static final int LAST_KEY_LISTENER = 4; |
| |
| private static final int DURABLE_CLIENT_LISTENER = 5; |
| |
| private static final int CLIENT_LISTENER_2 = 6; |
| |
| private static final String CREATE = "CREATE"; |
| |
| private static final String UPDATE = "UPDATE"; |
| |
| private static final String INVALIDATE = "INVALIDATE"; |
| |
| private static final String DESTROY = "DESTROY"; |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| final Host host = Host.getHost(0); |
| vm0 = host.getVM(0); |
| vm1 = host.getVM(1); |
| vm2 = host.getVM(2); |
| vm3 = host.getVM(3); |
| |
| vm0.invoke(this::resetAll); |
| vm1.invoke(this::resetAll); |
| vm2.invoke(this::resetAll); |
| vm3.invoke(this::resetAll); |
| resetAll(); |
| } |
| |
| @Override |
| public final void preTearDown() throws Exception { |
| closeCache(); |
| vm2.invoke((SerializableCallableIF) this::closeCache); |
| vm3.invoke((SerializableCallableIF) this::closeCache); |
| |
| // Unset the isSlowStartForTesting flag |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| // then close the servers |
| vm0.invoke((SerializableRunnableIF) this::closeCache); |
| vm1.invoke((SerializableRunnableIF) this::closeCache); |
| disconnectAllFromDS(); |
| } |
| |
| @Test |
| public void testS2CSuccessfulDeltaPropagationWithCompression() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, |
| NO_LISTENER, false, compressor)); |
| |
| vm0.invoke( |
| () -> assertTrue(cache.getRegion(regionName).getAttributes().getCompressor() != null)); |
| |
| createClientCache(PORT1, -1, "0", CLIENT_LISTENER); |
| |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareDeltas); |
| prepareDeltas(); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == toDeltas); |
| |
| verifyData(2, EVENTS_SIZE - 1); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CSuccessfulDeltaPropagation() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", CLIENT_LISTENER); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareDeltas); |
| prepareDeltas(); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == toDeltas); |
| |
| verifyData(2, EVENTS_SIZE - 1); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CFailureInToDeltaMethod() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", CLIENT_LISTENER_2); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareErroneousDeltasForToDelta); |
| prepareErroneousDeltasForToDelta(); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| long toDeltafailures = vm0.invoke(DeltaTestImpl::getToDeltaFailures); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1 - 1/* |
| * This is because the one failed in toDelta will be sent as full |
| * value. So client will not see it as 'delta'. |
| */) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == (EVENTS_SIZE - 1 - 1)); |
| assertTrue(1 + " deltas were to be failed while extracting but were " + toDeltafailures, |
| toDeltafailures == 1); |
| |
| verifyData(2, EVENTS_SIZE - 1 - 1 /* Full value no more sent if toDelta() fails */); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CFailureInFromDeltaMethod() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", CLIENT_LISTENER); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareErroneousDeltasForFromDelta); |
| prepareErroneousDeltasForFromDelta(); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| long fromDeltafailures = DeltaTestImpl.getFromDeltaFailures(); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == toDeltas); |
| assertTrue(1 + " deltas were to be failed while applying but were " + fromDeltafailures, |
| fromDeltafailures == 1); |
| |
| verifyData(2, EVENTS_SIZE - 1); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CWithOldValueAtClientOverflownToDisk() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| EvictionAttributes evAttr = |
| EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK); |
| |
| createClientCache(PORT1, -1, "0", true/* add listener */, evAttr); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareDeltas); |
| prepareDeltas(); |
| |
| vm0.invoke(this::createDelta); |
| vm0.invoke(this::createAnEntry); |
| Thread.sleep(5000); // TODO: Find a better 'n reliable alternative |
| // assert overflow occurred on client vm |
| verifyOverflowOccurred(1L, 2); |
| vm0.invoke(this::updateDelta); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == (EVENTS_SIZE - 1)); |
| |
| verifyData(3, EVENTS_SIZE - 1); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CWithLocallyDestroyedOldValueAtClient() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| EvictionAttributes evAttr = |
| EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.LOCAL_DESTROY); |
| |
| createClientCache(PORT1, -1, "0", true/* add listener */, evAttr); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareDeltas); |
| prepareDeltas(); |
| |
| vm0.invoke(this::createDelta); |
| vm0.invoke(this::createAnEntry); |
| Thread.sleep(5000); // TODO: Find a better 'n reliable alternative |
| // assert overflow occurred on client vm |
| verifyOverflowOccurred(1L, 1); |
| vm0.invoke(this::updateDelta); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue( |
| (EVENTS_SIZE - 1 - 1/* destroyed */) + " deltas were to be received but were " + fromDeltas, |
| fromDeltas == (EVENTS_SIZE - 1 - 1)); |
| |
| verifyData(4, EVENTS_SIZE - 2); |
| } |
| |
| @Test |
| public void testS2CWithInvalidatedOldValueAtClient() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", CLIENT_LISTENER); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::prepareDeltas); |
| prepareDeltas(); |
| |
| vm0.invoke(this::createDelta); |
| vm0.invoke(this::invalidateDelta); |
| vm0.invoke(this::updateDelta); |
| |
| waitForLastKey(); |
| |
| long toDeltas = vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltas = DeltaTestImpl.getFromDeltaInvokations(); |
| |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltas, |
| toDeltas == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1 - 1/* invalidated */) + " deltas were to be received but were " |
| + fromDeltas, fromDeltas == (EVENTS_SIZE - 1 - 1)); |
| |
| verifyData(2, EVENTS_SIZE - 1); |
| assertTrue(listenerError.toString(), areListenerResultsValid); |
| } |
| |
| @Test |
| public void testS2CDeltaPropagationWithClientConflationON() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON, |
| LAST_KEY_LISTENER, null, null); |
| |
| registerInterestListAll(); |
| vm0.invoke(this::prepareDeltas); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| |
| // TODO: (Amogh) get CCPStats and assert 0 deltas sent. |
| assertEquals(0, DeltaTestImpl.getFromDeltaInvokations().longValue()); |
| } |
| |
| @Test |
| public void testS2CDeltaPropagationWithServerConflationON() throws Exception { |
| vm0.invoke((SerializableRunnableIF) this::closeCache); |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1, |
| NO_LISTENER, true /* conflate */, null)); |
| |
| createClientCache(PORT1, -1, "0", DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, |
| LAST_KEY_LISTENER, null, null); |
| |
| vm3.invoke(() -> createClientCache(PORT1, -1, "0", |
| DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF, LAST_KEY_LISTENER, null, null)); |
| |
| registerInterestListAll(); |
| vm3.invoke(this::registerInterestListAll); |
| |
| vm0.invoke(this::prepareDeltas); |
| vm0.invoke(this::createAndUpdateDeltas); |
| |
| waitForLastKey(); |
| vm3.invoke(this::waitForLastKey); |
| |
| // TODO: (Amogh) use CCPStats. |
| assertEquals("Delta Propagation feature used.", 0, |
| DeltaTestImpl.getFromDeltaInvokations().longValue()); |
| long fromDeltaInvocations = vm3.invoke(DeltaTestImpl::getFromDeltaInvokations); |
| assertEquals((EVENTS_SIZE - 1), fromDeltaInvocations); |
| } |
| |
| @Test |
| public void testS2CDeltaPropagationWithOnlyCreateEvents() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| createClientCache(PORT1, -1, "0", LAST_KEY_LISTENER); |
| registerInterestListAll(); |
| |
| vm0.invoke(this::createDeltas); |
| waitForLastKey(); |
| |
| assertEquals(0l, ((Long) vm0.invoke(DeltaTestImpl::getToDeltaInvokations)).longValue()); |
| assertTrue("Delta Propagation feature used.", DeltaTestImpl.getFromDeltaInvokations() == 0); |
| } |
| |
| /** |
| * Tests that an update on a server with full Delta object causes distribution of the full Delta |
| * instance, and not its delta bits, to other peers, even if that instance's |
| * <code>hasDelta()</code> returns true. |
| */ |
| @Test |
| public void testC2S2SDeltaPropagation() throws Exception { |
| prepareDeltas(); |
| vm0.invoke(this::prepareDeltas); |
| vm1.invoke(this::prepareDeltas); |
| |
| DeltaTestImpl val = deltaPut[1]; |
| vm0.invoke((SerializableRunnableIF) this::closeCache); |
| |
| PORT1 = vm0.invoke( |
| () -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1, C2S2S_SERVER_LISTENER)); |
| PORT2 = vm1.invoke( |
| () -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY, 1, C2S2S_SERVER_LISTENER)); |
| |
| createClientCache(PORT1, -1, "0", NO_LISTENER); |
| |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| r.create(DELTA_KEY, deltaPut[0]); |
| |
| // Invalidate the value at both the servers. |
| vm0.invoke(() -> doLocalOp(INVALIDATE, regionName, DELTA_KEY)); |
| vm1.invoke(() -> doLocalOp(INVALIDATE, regionName, DELTA_KEY)); |
| |
| vm0.invoke(() -> assertOp(INVALIDATE, 1)); |
| vm1.invoke(() -> assertOp(INVALIDATE, 1)); |
| |
| r.put(DELTA_KEY, val); |
| Thread.sleep(5000); |
| |
| // Assert that vm0 distributed val as full value to vm1. |
| vm1.invoke(() -> assertValue(regionName, DELTA_KEY, val)); |
| |
| assertTrue("Delta Propagation feature used.", !vm0.invoke(DeltaTestImpl::deltaFeatureUsed)); |
| assertTrue("Delta Propagation feature used.", !vm1.invoke(DeltaTestImpl::deltaFeatureUsed)); |
| assertTrue("Delta Propagation feature NOT used.", DeltaTestImpl.deltaFeatureUsed()); |
| } |
| |
| @Test |
| public void testS2S2CDeltaPropagationWithHAOverflow() throws Exception { |
| prepareDeltas(); |
| vm0.invoke(this::prepareDeltas); |
| vm1.invoke(this::prepareDeltas); |
| |
| vm0.invoke((SerializableRunnableIF) this::closeCache); |
| |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1)); |
| PORT2 = vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_ENTRY, 1)); |
| |
| vm0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| vm1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| |
| createClientCache(PORT2, -1, "0", CLIENT_LISTENER); |
| |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| r.registerInterest("ALL_KEYS"); |
| |
| vm0.invoke(this::createAndUpdateDeltas); |
| vm1.invoke(() -> confirmEviction(PORT2)); |
| |
| vm1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| waitForLastKey(); |
| |
| long toDeltasOnServer1 = (Long) vm0.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltasOnServer2 = (Long) vm1.invoke(DeltaTestImpl::getFromDeltaInvokations); |
| long toDeltasOnServer2 = (Long) vm1.invoke(DeltaTestImpl::getToDeltaInvokations); |
| long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations(); |
| |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be sent but were " + toDeltasOnServer1, |
| toDeltasOnServer1 == (EVENTS_SIZE - 1)); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltasOnServer2, |
| fromDeltasOnServer2 == (EVENTS_SIZE - 1)); |
| assertTrue("0 toDelta() were to be invoked but were " + toDeltasOnServer2, |
| toDeltasOnServer2 == 0); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltasOnClient, |
| fromDeltasOnClient == (EVENTS_SIZE - 1)); |
| } |
| |
| @Test |
| public void testS2CDeltaPropagationWithGIIAndFailover() throws Exception { |
| prepareDeltas(); |
| vm0.invoke(this::prepareDeltas); |
| vm1.invoke(this::prepareDeltas); |
| vm2.invoke(this::prepareDeltas); |
| |
| vm0.invoke((SerializableRunnableIF) this::closeCache); |
| |
| PORT1 = |
| vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER)); |
| PORT2 = |
| vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER)); |
| int port3 = |
| vm2.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_NONE, 1, NO_LISTENER)); |
| |
| // Do puts after slowing the dispatcher. |
| try { |
| vm0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| vm1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| vm2.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| |
| createClientCache(new int[] {PORT1, PORT2, port3}, "1", |
| DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, CLIENT_LISTENER, null, null); |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| r.registerInterest("ALL_KEYS"); |
| |
| Pool testPool = PoolManager.getAll().values().stream().findFirst().get(); |
| |
| VM primary = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0 |
| : ((((PoolImpl) testPool).getPrimaryPort() == PORT2) ? vm1 : vm2); |
| |
| primary.invoke(this::createAndUpdateDeltas); |
| Thread.sleep(5000); |
| |
| primary.invoke((SerializableRunnableIF) this::closeCache); |
| Thread.sleep(5000); |
| |
| primary = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0 |
| : ((((PoolImpl) testPool).getPrimaryPort() == PORT2) ? vm1 : vm2); |
| |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm2.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| primary.invoke((SerializableRunnableIF) this::closeCache); |
| Thread.sleep(5000); |
| |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("waiting for client to receive last_key"); |
| waitForLastKey(); |
| |
| long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations(); |
| assertTrue((EVENTS_SIZE - 1) + " deltas were to be received but were " + fromDeltasOnClient, |
| fromDeltasOnClient == (EVENTS_SIZE - 1)); |
| } finally { |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm2.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| } |
| } |
| |
| @Test |
| public void testBug40165ClientReconnects() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| /** |
| * 1. Create a cache server with slow dispatcher 2. Start a durable client with a custom cache |
| * listener which shuts itself down as soon as it receives a marker message. 3. Do some puts on |
| * the server region 4. Let the dispatcher start dispatching 5. Verify that durable client is |
| * disconnected as soon as it processes the marker. Server will retain its queue which has some |
| * events (containing deltas) in it. 6. Restart the durable client without the self-destructing |
| * listener. 7. Wait till the durable client processes all its events. 8. Verify that no deltas |
| * are received by it. |
| */ |
| |
| // Step 0 |
| prepareDeltas(); |
| vm0.invoke(this::prepareDeltas); |
| |
| // Step 1 |
| try { |
| vm0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| |
| // Step 2 |
| String durableClientId = getName() + "_client"; |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer("localhost", PORT1).setSubscriptionEnabled(true).setSubscriptionAckInterval(1); |
| ((PoolFactoryImpl) pf).getPoolAttributes(); |
| |
| Properties properties = new Properties(); |
| properties.setProperty(MCAST_PORT, "0"); |
| properties.setProperty(LOCATORS, ""); |
| properties.setProperty(DURABLE_CLIENT_ID, durableClientId); |
| properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(60)); |
| |
| createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties, |
| DURABLE_CLIENT_LISTENER, true); |
| |
| // Step 3 |
| vm0.invoke((SerializableRunnableIF) this::doPuts); |
| |
| // Step 4 |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| |
| // Step 5 |
| // verifyDurableClientDisconnected(); |
| Thread.sleep(5000); |
| |
| // Step 6 |
| createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties, |
| DURABLE_CLIENT_LISTENER, false); |
| |
| // Step 7 |
| waitForLastKey(); |
| |
| // Step 8 |
| long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations(); |
| assertTrue("No deltas were to be received but received: " + fromDeltasOnClient, |
| fromDeltasOnClient < 1); |
| } finally { |
| // Step 4 |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| } |
| |
| } |
| |
| @Test |
| public void testBug40165ClientFailsOver() throws Exception { |
| PORT1 = vm0.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| |
| /** |
| * 1. Create two cache servers with slow dispatcher 2. Start a durable client with a custom |
| * cache listener 3. Do some puts on the server region 4. Let the dispatcher start dispatching |
| * 5. Wait till the durable client receives marker from its primary. 6. Kill the primary server, |
| * so that the second one becomes primary. 7. Wait till the durable client processes all its |
| * events. 8. Verify that expected number of deltas are received by it. |
| */ |
| |
| // Step 0 |
| prepareDeltas(); |
| vm0.invoke(this::prepareDeltas); |
| vm1.invoke(this::prepareDeltas); |
| |
| try { |
| // Step 1 |
| vm0.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| PORT2 = vm1.invoke(() -> createServerCache(HARegionQueue.HA_EVICTION_POLICY_MEMORY)); |
| vm1.invoke(() -> ConflationDUnitTestHelper.setIsSlowStart("60000")); |
| |
| // Step 2 |
| String durableClientId = getName() + "_client"; |
| PoolFactory pf = PoolManager.createFactory(); |
| pf.addServer("localhost", PORT1).addServer("localhost", PORT2).setSubscriptionEnabled(true) |
| .setSubscriptionAckInterval(1).setSubscriptionRedundancy(2); |
| ((PoolFactoryImpl) pf).getPoolAttributes(); |
| |
| Properties properties = new Properties(); |
| properties.setProperty(MCAST_PORT, "0"); |
| properties.setProperty(LOCATORS, ""); |
| properties.setProperty(DURABLE_CLIENT_ID, durableClientId); |
| properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(60)); |
| |
| createDurableCacheClient(((PoolFactoryImpl) pf).getPoolAttributes(), regionName, properties, |
| DURABLE_CLIENT_LISTENER, false); |
| |
| // Step 3 |
| vm0.invoke((SerializableRunnableIF) this::doPuts); |
| } finally { |
| // Step 4 |
| vm0.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| vm1.invoke(ConflationDUnitTestHelper::unsetIsSlowStart); |
| } |
| |
| // Step 5 |
| Pool testPool = PoolManager.getAll().values().stream().findFirst().get(); |
| VM pVM = (((PoolImpl) testPool).getPrimaryPort() == PORT1) ? vm0 : vm1; |
| while (!markerReceived) { |
| Thread.sleep(50); |
| } |
| |
| // Step 6 |
| pVM.invoke((SerializableRunnableIF) this::closeCache); |
| Thread.sleep(5000); |
| |
| // Step 7 |
| waitForLastKey(); |
| |
| // Step 8 |
| long fromDeltasOnClient = DeltaTestImpl.getFromDeltaInvokations(); |
| assertTrue("Atleast 99 deltas were to be received but received: " + fromDeltasOnClient, |
| fromDeltasOnClient >= 99); |
| } |
| |
| public void doLocalOp(String op, String rName, String key) { |
| try { |
| Region r = cache.getRegion("/" + rName); |
| assertNotNull(r); |
| if (INVALIDATE.equals(op)) { |
| r.localInvalidate(key); |
| } else if (DESTROY.equals(op)) { |
| r.localDestroy(key); |
| } |
| } catch (Exception e) { |
| org.apache.geode.test.dunit.Assert.fail("failed in doLocalOp()", e); |
| } |
| } |
| |
| public void assertOp(String op, Integer num) { |
| final int expected = num; |
| WaitCriterion wc = null; |
| if (INVALIDATE.equals(op)) { |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return numOfInvalidates == expected; |
| } |
| |
| @Override |
| public String description() { |
| return "numOfInvalidates was expected to be " + expected + " but is " + numOfInvalidates; |
| } |
| }; |
| } else if (DESTROY.equals(op)) { |
| wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return numOfInvalidates == expected; |
| } |
| |
| @Override |
| public String description() { |
| return "numOfDestroys was expected to be " + expected + " but is " + numOfDestroys; |
| } |
| }; |
| } |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| private void assertValue(String rName, String key, Object expected) { |
| try { |
| Region r = cache.getRegion("/" + rName); |
| assertNotNull(r); |
| Object value = r.getEntry(key).getValue(); |
| assertTrue("Value against " + key + " is " + value + ". It should be " + expected, |
| expected.equals(value)); |
| } catch (Exception e) { |
| org.apache.geode.test.dunit.Assert.fail("failed in assertValue()", e); |
| } |
| } |
| |
| private void confirmEviction(Integer port) { |
| final EvictionController cc = ((VMLRURegionMap) ((LocalRegion) cache.getRegion( |
| SEPARATOR + generateNameForClientMsgsRegion(port))).entries) |
| .getEvictionController(); |
| |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return cc.getCounters().getEvictions() > 0; |
| } |
| |
| @Override |
| public String description() { |
| return "HA Overflow did not occure."; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| private void waitForLastKey() { |
| WaitCriterion wc = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return isLastKeyReceived(); |
| } |
| |
| @Override |
| public String description() { |
| return "Last key NOT received."; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| } |
| |
| private void prepareDeltas() { |
| for (int i = 0; i < EVENTS_SIZE; i++) { |
| deltaPut[i] = |
| new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObjectWithIdentifier("0", 0)); |
| } |
| deltaPut[1].setIntVar(5); |
| deltaPut[2].setIntVar(5); |
| deltaPut[3].setIntVar(5); |
| deltaPut[4].setIntVar(5); |
| deltaPut[5].setIntVar(5); |
| |
| deltaPut[2].resetDeltaStatus(); |
| deltaPut[2].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[3].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[4].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| |
| deltaPut[3].resetDeltaStatus(); |
| deltaPut[3].setDoubleVar(5d); |
| deltaPut[4].setDoubleVar(5d); |
| deltaPut[5].setDoubleVar(5d); |
| |
| deltaPut[4].resetDeltaStatus(); |
| deltaPut[4].setStr("str changed"); |
| deltaPut[5].setStr("str changed"); |
| |
| deltaPut[5].resetDeltaStatus(); |
| deltaPut[5].setIntVar(100); |
| deltaPut[5].setTestObj(new TestObjectWithIdentifier("CHANGED", 100)); |
| } |
| |
| private void prepareErroneousDeltasForToDelta() { |
| for (int i = 0; i < EVENTS_SIZE; i++) { |
| deltaPut[i] = |
| new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObjectWithIdentifier("0", 0)); |
| } |
| deltaPut[1].setIntVar(5); |
| deltaPut[2].setIntVar(5); |
| deltaPut[3].setIntVar(DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA); |
| deltaPut[4].setIntVar(5); |
| deltaPut[5].setIntVar(5); |
| |
| deltaPut[2].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[3].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[4].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| |
| deltaPut[3].setDoubleVar(5d); |
| deltaPut[4].setDoubleVar(5d); |
| deltaPut[5].setDoubleVar(5d); |
| |
| deltaPut[4].setStr("str changed"); |
| deltaPut[5].setStr("str changed"); |
| |
| deltaPut[5].setIntVar(100); |
| deltaPut[5].setTestObj(new TestObjectWithIdentifier("CHANGED", 100)); |
| } |
| |
| private void prepareErroneousDeltasForFromDelta() { |
| for (int i = 0; i < EVENTS_SIZE; i++) { |
| deltaPut[i] = |
| new DeltaTestImpl(0, "0", 0d, new byte[0], new TestObjectWithIdentifier("0", 0)); |
| } |
| deltaPut[1].setIntVar(5); |
| deltaPut[2].setIntVar(5); |
| deltaPut[3].setIntVar(5); |
| deltaPut[4].setIntVar(5); |
| deltaPut[5].setIntVar(5); |
| |
| deltaPut[2].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[3].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[4].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| deltaPut[5].setByteArr(new byte[] {1, 2, 3, 4, 5}); |
| |
| deltaPut[3].setDoubleVar(5d); |
| deltaPut[4].setDoubleVar(5d); |
| deltaPut[5].setDoubleVar(5d); |
| |
| deltaPut[4].setStr("str changed"); |
| deltaPut[5].setStr(DeltaTestImpl.ERRONEOUS_STRING_FOR_FROM_DELTA); |
| |
| deltaPut[5].setIntVar(100); |
| deltaPut[5].setTestObj(new TestObjectWithIdentifier("CHANGED", 100)); |
| } |
| |
| private void doPuts() { |
| doPuts(100); |
| } |
| |
| private void doPuts(Integer num) { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| for (int i = 0; i < num; i++) { |
| DeltaTestImpl val = new DeltaTestImpl(); |
| val.setStr("" + i); |
| r.put(DELTA_KEY, val); |
| } |
| r.put(LAST_KEY, ""); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in createDelta()", ex); |
| } |
| } |
| |
| private void createAndUpdateDeltas() { |
| createDelta(); |
| updateDelta(); |
| } |
| |
| private void createDelta() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| r.create(DELTA_KEY, deltaPut[0]); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in createDelta()", ex); |
| } |
| } |
| |
| private void updateDelta() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| for (int i = 1; i < EVENTS_SIZE; i++) { |
| try { |
| r.put(DELTA_KEY, deltaPut[i]); |
| } catch (InvalidDeltaException ide) { |
| assertTrue("InvalidDeltaException not expected for deltaPut[" + i + "]", |
| deltaPut[i].getIntVar() == DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA); |
| } |
| } |
| r.put(LAST_KEY, ""); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in updateDelta()", ex); |
| } |
| } |
| |
| private void createDeltas() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| for (int i = 0; i < 100; i++) { |
| r.create(DELTA_KEY + i, new DeltaTestImpl()); |
| } |
| r.create(LAST_KEY, ""); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in createDeltas()", ex); |
| } |
| } |
| |
| private void createAnEntry() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| r.create("KEY-A", "I push the delta out to disk :)"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in createAnEntry()", ex); |
| } |
| } |
| |
| private void invalidateDelta() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| |
| r.invalidate(DELTA_KEY); |
| } catch (Exception ex) { |
| fail("failed in invalidateDelta()" + ex.getMessage()); |
| } |
| } |
| |
| private void verifyOverflowOccurred(long evictions, int regionsize) { |
| EvictionController cc = ((VMLRURegionMap) ((LocalRegion) cache.getRegion(regionName)).entries) |
| .getEvictionController(); |
| Assert.assertTrue(cc.getCounters().getEvictions() == evictions, |
| "Number of evictions expected to be " + evictions + " but was " |
| + cc.getCounters().getEvictions()); |
| int rSize = ((LocalRegion) cache.getRegion(regionName)).getRegionMap().size(); |
| Assert.assertTrue(rSize == regionsize, |
| "Region size expected to be " + regionsize + " but was " + rSize); |
| } |
| |
| private void verifyData(int creates, int updates) { |
| assertEquals(creates, numOfCreates); |
| assertEquals(updates, numOfUpdates); |
| } |
| |
| private Integer createServerCache(String ePolicy) throws Exception { |
| return createServerCache(ePolicy, 1); |
| } |
| |
| private Integer createServerCache(String ePolicy, Integer cap) throws Exception { |
| return createServerCache(ePolicy, cap, NO_LISTENER); |
| } |
| |
| private Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode) |
| throws Exception { |
| return createServerCache(ePolicy, cap, listenerCode, false, null); |
| } |
| |
| private Integer createServerCache(String ePolicy, Integer cap, Integer listenerCode, |
| Boolean conflate, Compressor compressor) throws Exception { |
| ConnectionTable.threadWantsSharedResources(); |
| createCache(new Properties()); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setEnableSubscriptionConflation(conflate); |
| if (listenerCode != 0) { |
| factory.addCacheListener(getCacheListener(listenerCode)); |
| } |
| if (compressor != null) { |
| factory.setCompressor(compressor); |
| } |
| if (listenerCode == C2S2S_SERVER_LISTENER) { |
| factory.setScope(Scope.DISTRIBUTED_NO_ACK); |
| factory.setDataPolicy(DataPolicy.NORMAL); |
| factory.setConcurrencyChecksEnabled(false); |
| RegionAttributes attrs = factory.create(); |
| Region r = cache.createRegion(regionName, attrs); |
| logger = cache.getLogger(); |
| r.create(DELTA_KEY, deltaPut[0]); |
| } else { |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(false); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(regionName, attrs); |
| logger = cache.getLogger(); |
| } |
| |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| CacheServer server1 = cache.addCacheServer(); |
| server1.setPort(port); |
| server1.setNotifyBySubscription(true); |
| if (ePolicy != null) { |
| File overflowDirectory = new File("bsi_overflow_" + port); |
| overflowDirectory.mkdir(); |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| File[] dirs1 = new File[] {overflowDirectory}; |
| |
| server1.getClientSubscriptionConfig().setEvictionPolicy(ePolicy); |
| server1.getClientSubscriptionConfig().setCapacity(cap); |
| // specify diskstore for this server |
| server1.getClientSubscriptionConfig() |
| .setDiskStoreName(dsf.setDiskDirs(dirs1).create("bsi").getName()); |
| } |
| server1.start(); |
| return server1.getPort(); |
| } |
| |
| private CacheListener getCacheListener(Integer code) { |
| CacheListener listener = null; |
| switch (code) { |
| case 0: |
| break; |
| case SERVER_LISTENER: |
| // listener = new CacheListenerAdapter() {}; |
| break; |
| case CLIENT_LISTENER: |
| listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| numOfCreates++; |
| logger.fine("Create Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| if (DELTA_KEY.equals(event.getKey()) && !deltaPut[0].equals(event.getNewValue())) { |
| areListenerResultsValid = false; |
| listenerError.append("Create event:\n |-> sent: " + deltaPut[0] + "\n |-> rcvd: " |
| + event.getNewValue() + "\n"); |
| } else if (LAST_KEY.equals(event.getKey())) { |
| lastKeyReceived = true; |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| numOfUpdates++; |
| logger.fine("Update Event: <" + event.getKey() + ", " + event.getNewValue() + ">" |
| + ", numOfUpdates: " + numOfUpdates); |
| if (!deltaPut[numOfUpdates].equals(event.getNewValue())) { |
| areListenerResultsValid = false; |
| listenerError.append("\nUpdate event(" + numOfUpdates + "):\n |-> sent: " |
| + deltaPut[numOfUpdates] + "\n |-> recd: " + event.getNewValue()); |
| } |
| } |
| }; |
| break; |
| case CLIENT_LISTENER_2: |
| listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| numOfCreates++; |
| logger.fine("Create Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| if (DELTA_KEY.equals(event.getKey()) && !deltaPut[0].equals(event.getNewValue())) { |
| areListenerResultsValid = false; |
| listenerError.append("Create event:\n |-> sent: " + deltaPut[0] + "\n |-> rcvd: " |
| + event.getNewValue() + "\n"); |
| } else if (LAST_KEY.equals(event.getKey())) { |
| lastKeyReceived = true; |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| int tmp = ++numOfUpdates; |
| logger.fine("Update Event: <" + event.getKey() + ", " + event.getNewValue() + ">" |
| + ", numOfUpdates: " + numOfUpdates); |
| // Hack to ignore illegal delta put |
| tmp = (tmp >= 3) ? ++tmp : tmp; |
| if (!deltaPut[tmp].equals(event.getNewValue())) { |
| areListenerResultsValid = false; |
| listenerError.append("\nUpdate event(" + numOfUpdates + "):\n |-> sent: " |
| + deltaPut[tmp] + "\n |-> recd: " + event.getNewValue()); |
| } |
| } |
| }; |
| break; |
| case C2S2S_SERVER_LISTENER: |
| listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| numOfCreates++; |
| logger.fine("Create Event: <" + event.getKey() + ", " + event.getNewValue() + ">"); |
| if (LAST_KEY.equals(event.getKey())) { |
| lastKeyReceived = true; |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| numOfUpdates++; |
| logger.fine("Update Event: <" + event.getKey() + ", " + event.getNewValue() + ">" |
| + ", numOfUpdates: " + numOfUpdates); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| numOfInvalidates++; |
| logger.fine("Invalidate Event: <" + event.getKey() + ", " + event.getOldValue() + ">" |
| + ", numOfInvalidates: " + numOfInvalidates); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| numOfDestroys++; |
| logger.fine("Destroy Event: <" + event.getKey() + ", " + event.getOldValue() + ">" |
| + ", numOfDestroys: " + numOfDestroys); |
| } |
| }; |
| break; |
| case LAST_KEY_LISTENER: |
| listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| if (LAST_KEY.equals(event.getKey())) { |
| lastKeyReceived = true; |
| } |
| } |
| }; |
| break; |
| case DURABLE_CLIENT_LISTENER: |
| listener = new CacheListenerAdapter() { |
| @Override |
| public void afterRegionLive(RegionEvent event) { |
| logger.fine("Marker received"); |
| if (Operation.MARKER == event.getOperation()) { |
| markerReceived = true; |
| if (closeCache) { |
| logger.fine("Closing the durable client cache..."); |
| closeCache(true); // keepAlive |
| } |
| } |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| logger.fine("CREATE received"); |
| if (LAST_KEY.equals(event.getKey())) { |
| logger.fine("LAST KEY received"); |
| lastKeyReceived = true; |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| assertNotNull(event.getNewValue()); |
| } |
| }; |
| break; |
| default: |
| fail("Invalid listener code"); |
| break; |
| } |
| return listener; |
| |
| } |
| |
| private void createClientCache(Integer port1, Integer port2, String rLevel) throws Exception { |
| createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, |
| CLIENT_LISTENER, null, null); |
| } |
| |
| private void createClientCache(Integer port1, Integer port2, String rLevel, Boolean addListener, |
| EvictionAttributes evictAttrs) throws Exception { |
| createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, |
| CLIENT_LISTENER, evictAttrs, null); |
| } |
| |
| private void createClientCache(Integer port1, Integer port2, String rLevel, Boolean addListener, |
| ExpirationAttributes expAttrs) throws Exception { |
| createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, |
| CLIENT_LISTENER, null, expAttrs); |
| } |
| |
| private void createClientCache(Integer port1, Integer port2, String rLevel, Integer listener) |
| throws Exception { |
| createClientCache(port1, port2, rLevel, DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT, |
| listener, null, null); |
| } |
| |
| private void createClientCache(Integer port1, Integer port2, String rLevel, String conflate, |
| Integer listener, EvictionAttributes evictAttrs, ExpirationAttributes expAttrs) |
| throws Exception { |
| int[] ports = null; |
| if (port2 != -1) { |
| ports = new int[] {port1, port2}; |
| } else { |
| ports = new int[] {port1}; |
| } |
| assertTrue("No server ports provided", ports != null); |
| createClientCache(ports, rLevel, conflate, listener, evictAttrs, expAttrs); |
| } |
| |
| private void createClientCache(int[] ports, String rLevel, String conflate, Integer listener, |
| EvictionAttributes evictAttrs, ExpirationAttributes expAttrs) throws Exception { |
| CacheServerTestUtil.disableShufflingOfEndpoints(); |
| |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| props.setProperty(CONFLATE_EVENTS, conflate); |
| createCache(props); |
| AttributesFactory factory = new AttributesFactory(); |
| ClientServerTestCase.configureConnectionPool(factory, "localhost", ports, true, |
| Integer.parseInt(rLevel), 2, null, 1000, 250, -2); |
| |
| factory.setScope(Scope.LOCAL); |
| |
| if (listener != 0) { |
| factory.addCacheListener(getCacheListener(listener)); |
| } |
| |
| if (evictAttrs != null) { |
| factory.setEvictionAttributes(evictAttrs); |
| } |
| if (expAttrs != null) { |
| factory.setEntryTimeToLive(expAttrs); |
| } |
| if (evictAttrs != null && evictAttrs.getAction().isOverflowToDisk()) { |
| // create diskstore with overflow dir |
| // since it's overflow, no need to recover, so we can use random number as dir name |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| File dir = new File("overflow_" + port); |
| if (!dir.exists()) { |
| dir.mkdir(); |
| } |
| File[] dir1 = new File[] {dir}; |
| DiskStoreFactory dsf = cache.createDiskStoreFactory(); |
| factory.setDiskStoreName(dsf.setDiskDirs(dir1).create("client_overflow_ds").getName()); |
| } |
| factory.setConcurrencyChecksEnabled(false); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(regionName, attrs); |
| logger = cache.getLogger(); |
| } |
| |
| private void createCache(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| ds.disconnect(); |
| ds = getSystem(props); |
| assertNotNull(ds); |
| cache = CacheFactory.create(ds); |
| assertNotNull(cache); |
| } |
| |
| private void createDurableCacheClient(Pool poolAttr, String regionName, Properties dsProperties, |
| Integer listenerCode, Boolean close) throws Exception { |
| new DeltaPropagationDUnitTest().createCache(dsProperties); |
| PoolFactoryImpl poolFactory = (PoolFactoryImpl) PoolManager.createFactory(); |
| poolFactory.init(poolAttr); |
| PoolImpl pool = (PoolImpl) poolFactory.create("DeltaPropagationDUnitTest"); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| factory.setPoolName(pool.getName()); |
| if (listenerCode != 0) { |
| factory.addCacheListener(getCacheListener(listenerCode)); |
| } |
| RegionAttributes attrs = factory.create(); |
| Region r = cache.createRegion(regionName, attrs); |
| r.registerInterest("ALL_KEYS"); |
| cache.readyForEvents(); |
| logger = cache.getLogger(); |
| closeCache = close; |
| } |
| |
| private void registerInterestListAll() { |
| try { |
| Region r = cache.getRegion("/" + regionName); |
| assertNotNull(r); |
| r.registerInterest("ALL_KEYS"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("failed in registerInterestListAll", ex); |
| } |
| } |
| |
| private Object closeCache() { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| return null; |
| } |
| return null; |
| } |
| |
| private void closeCache(boolean keepalive) { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(keepalive); |
| cache.getDistributedSystem().disconnect(); |
| } |
| } |
| |
| private boolean isLastKeyReceived() { |
| return lastKeyReceived; |
| } |
| |
| private void resetAll() { |
| DeltaTestImpl.resetDeltaInvokationCounters(); |
| numOfCreates = numOfUpdates = numOfInvalidates = numOfDestroys = 0; |
| lastKeyReceived = false; |
| markerReceived = false; |
| areListenerResultsValid = true; |
| listenerError = new StringBuffer(""); |
| } |
| } |