| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.cache30; |
| |
| import static org.apache.geode.cache.Region.SEPARATOR; |
| import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS; |
| import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; |
| import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.fail; |
| import static org.assertj.core.api.Assumptions.assumeThat; |
| |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionAdvisor; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.cache.DistributedCacheOperation; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.InitialImageOperation; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; |
| import org.apache.geode.internal.cache.RegionEntry; |
| import org.apache.geode.internal.cache.TombstoneService; |
| import org.apache.geode.internal.cache.UpdateOperation; |
| import org.apache.geode.internal.cache.VersionTagHolder; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.versions.VMVersionTag; |
| import org.apache.geode.internal.cache.versions.VersionStamp; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.cache.vmotion.VMotionObserver; |
| import org.apache.geode.internal.cache.vmotion.VMotionObserverHolder; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.junit.categories.OffHeapTest; |
| |
| @Category({OffHeapTest.class}) |
| public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitTest { |
| |
| @Override |
| protected boolean supportsTransactions() { |
| return true; |
| } |
| |
| @Override |
| public Properties getDistributedSystemProperties() { |
| Properties p = super.getDistributedSystemProperties(); |
| p.put(CONSERVE_SOCKETS, "false"); |
| if (distributedSystemID > 0) { |
| p.put(DISTRIBUTED_SYSTEM_ID, "" + distributedSystemID); |
| } |
| return p; |
| } |
| |
| /** |
| * Returns region attributes for a <code>GLOBAL</code> region |
| */ |
| @Override |
| protected <K, V> RegionAttributes<K, V> getRegionAttributes() { |
| AttributesFactory<K, V> factory = new AttributesFactory<>(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(true); |
| return factory.create(); |
| } |
| |
| @Override |
| protected <K, V> RegionAttributes<K, V> getRegionAttributes(String type) { |
| RegionAttributes<K, V> ra = getCache().getRegionAttributes(type); |
| if (ra == null) { |
| throw new IllegalStateException("The region shortcut " + type + " has been removed."); |
| } |
| AttributesFactory<K, V> factory = new AttributesFactory<>(ra); |
| factory.setConcurrencyChecksEnabled(true); |
| return factory.create(); |
| } |
| |
| @Ignore("replicates don't allow local destroy") |
| @Override |
| @Test |
| public void testLocalDestroy() { |
| // replicates don't allow local destroy |
| } |
| |
| @Ignore("replicates don't allow local destroy") |
| @Override |
| @Test |
| public void testEntryTtlLocalDestroy() { |
| // replicates don't allow local destroy |
| } |
| |
| /** |
| * This test creates a server cache in vm0 and a peer cache in vm1. It then tests to see if GII |
| * transferred tombstones to vm1 like it's supposed to. A client cache is created in vm2 and the |
| * same sort of check is performed for register-interest. |
| */ |
| @Test |
| public void testGIISendsTombstones() { |
| versionTestGIISendsTombstones(); |
| } |
| |
| /** |
| * test for bug #45564. a create() is received by region creator and then a later destroy() is |
| * received in initial image and while the version info from the destroy is recorded we keep the |
| * value from the create event |
| */ |
| @Test |
| public void testConcurrentOpWithGII() throws InterruptedException { |
| if (getClass() != DistributedAckRegionCCEDUnitTest.class) { |
| return; // not really a scope-related thing |
| } |
| final String name = getUniqueName() + "-CC"; |
| final String key = "mykey"; |
| VM vm1 = VM.getVM(1); |
| VM vm2 = VM.getVM(2); |
| |
| // create some destroyed entries so the GC service is populated |
| // do conflicting update() and destroy() on the region. We want the update() to |
| // be sent with a message and the destroy() to be transferred in the initial image |
| // and be the value that we want to keep |
| vm1.invoke("create region", () -> { |
| RegionFactory f1 = getCache().createRegionFactory(getRegionAttributes()); |
| CCRegion = (LocalRegion) f1.create(name); |
| return CCRegion.getDistributionManager().getDistributionManagerId(); |
| }); |
| |
| AsyncInvocation<Void> partialCreate = |
| vm2.invokeAsync("create region with stall", () -> { |
| |
| final GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); |
| RegionFactory f = cache.createRegionFactory(getRegionAttributes()); |
| InitialImageOperation.VMOTION_DURING_GII = true; |
| // this will stall region creation at the point of asking for an initial image |
| VMotionObserverHolder.setInstance(new VMotionObserver() { |
| @Override |
| public void vMotionBeforeCQRegistration() {} |
| |
| @Override |
| public void vMotionBeforeRegisterInterest() {} |
| |
| @Override |
| public void vMotionDuringGII(Set recipientSet, LocalRegion region) { |
| InitialImageOperation.VMOTION_DURING_GII = false; |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| LocalRegion ccregion = (LocalRegion) cache.getInternalRegionByPath(SEPARATOR + name); |
| try { |
| // wait for the update op (sent below from vm1) to arrive, then allow the GII to |
| // happen |
| await().ignoreException(RegionDestroyedException.class) |
| .until(() -> ccregion.isDestroyed() || ccregion.getRegionEntry(key) != null); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| }); |
| try { |
| CCRegion = (LocalRegion) f.create(name); |
| // at this point we should have received the update op and then the GII, which should |
| // overwrite the conflicting update op |
| assertThat(CCRegion.containsKey(key)) |
| .describedAs("entry was destroyed by initial image transfer") |
| .isFalse(); |
| } finally { |
| InitialImageOperation.VMOTION_DURING_GII = false; |
| } |
| return null; |
| }); |
| |
| vm1.invoke("create conflicting events", () -> { |
| // wait for the other to come on line |
| DistributionAdvisor adv = ((DistributedRegion) CCRegion).getCacheDistributionAdvisor(); |
| |
| await().until(() -> !adv.adviseGeneric().isEmpty()); |
| assertThat(adv.adviseGeneric()) |
| .withFailMessage("other member never came on line") |
| .isNotEmpty(); |
| |
| DistributedCacheOperation.LOSS_SIMULATION_RATIO = 200.0; // inhibit all messaging |
| try { |
| CCRegion.put("mykey", "initialValue"); |
| CCRegion.destroy("mykey"); |
| } finally { |
| DistributedCacheOperation.LOSS_SIMULATION_RATIO = 0.0; |
| } |
| |
| // generate a fake version tag for the message |
| final VersionStamp versionStamp = CCRegion.getRegionEntry(key).getVersionStamp(); |
| VersionTag<InternalDistributedMember> tag = |
| (VersionTag<InternalDistributedMember>) versionStamp.asVersionTag(); |
| // create a fake member ID that will be < mine and lose a concurrency check |
| InternalDistributedMember nm = |
| CCRegion.getDistributionManager().getDistributionManagerId(); |
| InternalDistributedMember mbr = null; |
| |
| mbr = new InternalDistributedMember(nm.getInetAddress().getCanonicalHostName(), |
| nm.getMembershipPort() - 1, "fake_id", "fake_id_ustring", |
| ClusterDistributionManager.NORMAL_DM_TYPE, null, null); |
| tag.setMemberID(mbr); |
| |
| |
| // generate an event to distribute that contains the fake version tag |
| EntryEventImpl event = |
| EntryEventImpl.create(CCRegion, Operation.UPDATE, key, false, mbr, true, false); |
| event.setNewValue("newValue"); |
| event.setVersionTag(tag); |
| |
| // this should update the controller's cache with the updated value but leave this cache |
| // alone |
| DistributedCacheOperation op = new UpdateOperation(event, tag.getVersionTimeStamp()); |
| op.distribute(); |
| event.release(); |
| }); |
| |
| partialCreate.get(); |
| |
| } |
| |
| /** |
| * This tests the concurrency versioning system to ensure that event conflation happens correctly |
| * and that the statistic is being updated properly |
| */ |
| @Test |
| public void testConcurrentEvents() throws Exception { |
| versionTestConcurrentEvents(); |
| } |
| |
| @Test |
| public void testClearWithConcurrentEvents() { |
| versionTestClearWithConcurrentEvents(); |
| } |
| |
| @Test |
| public void testClearWithConcurrentEventsAsync() { |
| versionTestClearWithConcurrentEventsAsync(); |
| } |
| |
| @Test |
| public void testClearOnNonReplicateWithConcurrentEvents() { |
| versionTestClearOnNonReplicateWithConcurrentEvents(); |
| } |
| |
| @Test |
| public void testTombstones() { |
| versionTestTombstones(); |
| } |
| |
| /** |
| * make sure that an operation performed on a new region entry created after a tombstone has been |
| * reaped is accepted by another member that has yet to reap the tombstone |
| */ |
| @Test |
| public void testTombstoneExpirationRace() { |
| VM vm0 = VM.getVM(0); |
| VM vm1 = VM.getVM(1); |
| |
| final String name = getUniqueName() + "-CC"; |
| SerializableRunnable createRegion = new SerializableRunnable() { |
| @Override |
| public void run() { |
| RegionFactory f = getCache().createRegionFactory(getRegionAttributes()); |
| CCRegion = (LocalRegion) f.create(name); |
| CCRegion.put("cckey0", "ccvalue"); |
| CCRegion.put("cckey0", "ccvalue"); // version number will end up at 4; |
| } |
| }; |
| |
| vm0.invoke("Create Region", createRegion); |
| vm1.invoke("Create Region", createRegion); |
| vm1.invoke("Create local tombstone and adjust time", () -> { |
| // make the entry for cckey0 a tombstone in this VM and set its modification time to be |
| // older than the tombstone GC interval. This means it could be in the process of being |
| // reaped by distributed-GC |
| RegionEntry entry = CCRegion.getRegionEntry("cckey0"); |
| VersionTag tag = entry.getVersionStamp().asVersionTag(); |
| assertThat(tag.getEntryVersion()).isGreaterThan(1); |
| tag.setVersionTimeStamp( |
| System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000); |
| entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp()); |
| entry.makeTombstone(CCRegion, tag); |
| |
| }); |
| // now remove the entry on vm0, simulating that it initiated a GC, and perform a CREATE with a |
| // new version number |
| vm0.invoke("Locally destroy the entry and do a create that will be propagated with v1", () -> { |
| CCRegion.getRegionMap().removeEntry("cckey0", CCRegion.getRegionEntry("cckey0"), true); |
| if (CCRegion.getRegionEntry("ckey0") != null) { |
| fail("expected removeEntry to remove the entry from the region's map"); |
| } |
| CCRegion.put("cckey0", "updateAfterReap"); |
| }); |
| vm1.invoke("Check that the create() was applied", () -> { |
| RegionEntry entry = CCRegion.getRegionEntry("cckey0"); |
| assertThat(entry.getVersionStamp().getEntryVersion()).isEqualTo(1); |
| }); |
| disconnectAllFromDS(); |
| } |
| |
| /** |
| * Test for bug #46087 and #46089 where the waiting thread pool is flooded with threads performing |
| * distributed-GC. This could be moved to a JUnit test class. |
| */ |
| @Test |
| public void testAggressiveTombstoneReaping() throws InterruptedException { |
| final String name = getUniqueName() + "-CC"; |
| |
| final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT; |
| final long saveTombstoneTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT; |
| TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50; |
| TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 500; |
| try { |
| // create some destroyed entries so the GC service is populated |
| RegionFactory<Object, Object> f = getCache().createRegionFactory(getRegionAttributes()); |
| CCRegion = (LocalRegion) f.create(name); |
| final long initialCount = CCRegion.getCachePerfStats().getTombstoneGCCount(); |
| for (int i = 0; i < 100; i++) { |
| CCRegion.put("cckey" + i, "ccvalue" + i); |
| CCRegion.destroy("cckey" + i); |
| } |
| // now simulate a low free-memory condition |
| TombstoneService.FORCE_GC_MEMORY_EVENTS = true; |
| await() |
| .until(() -> CCRegion.getCachePerfStats().getTombstoneGCCount() > initialCount); |
| |
| Thread.sleep(5000); |
| long gcCount = CCRegion.getCachePerfStats().getTombstoneGCCount(); |
| assertThat(gcCount) |
| .withFailMessage("expected a few GCs, but not " + (gcCount - initialCount)) |
| .isLessThan(initialCount + 20); |
| } finally { |
| TombstoneService.EXPIRED_TOMBSTONE_LIMIT = saveExpiredTombstoneLimit; |
| TombstoneService.FORCE_GC_MEMORY_EVENTS = false; |
| TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = saveTombstoneTimeout; |
| } |
| } |
| |
| /** |
| * This tests the concurrency versioning system to ensure that event conflation happens correctly |
| * and that the statistic is being updated properly |
| */ |
| @Test |
| public void testConcurrentEventsOnEmptyRegion() { |
| versionTestConcurrentEventsOnEmptyRegion(); |
| } |
| |
| /** |
| * This tests the concurrency versioning system to ensure that event conflation happens correctly |
| * and that the statistic is being updated properly |
| */ |
| @Test |
| public void testConcurrentEventsOnNonReplicatedRegion() { |
| versionTestConcurrentEventsOnNonReplicatedRegion(); |
| } |
| |
| @Test |
| public void testGetAllWithVersions() { |
| versionTestGetAllWithVersions(); |
| } |
| |
| @Test |
| public void testEntryVersionRollover() throws Exception { |
| assumeThat(getClass() == DistributedAckRegionCCEDUnitTest.class).isTrue(); |
| |
| final String name = getUniqueName() + "-CC"; |
| final int numEntries = 1; |
| SerializableRunnable createRegion = new SerializableRunnable() { |
| @Override |
| public void run() { |
| RegionFactory f = getCache().createRegionFactory(getRegionAttributes()); |
| CCRegion = (LocalRegion) f.create(name); |
| for (int i = 0; i < numEntries; i++) { |
| CCRegion.put("cckey" + i, "ccvalue"); |
| } |
| assertThat(CCRegion.getCachePerfStats().getConflatedEventsCount()) |
| .describedAs("conflated event count") |
| .isEqualTo(0); |
| |
| } |
| }; |
| VM vm0 = VM.getVM(0); |
| vm0.invoke("Create Region", createRegion); |
| try { |
| createRegion.run(); |
| VersionTag tag = new VMVersionTag(); |
| // set the version to the max - it should make the system think there's a rollover and reject |
| // the change. Then apply it to the cache as if it is a replayed client operation. That should |
| // cause the cache to apply the op locally |
| tag.setEntryVersion(0xFFFFFF); |
| tag.setDistributedSystemId(1); |
| tag.setRegionVersion(CCRegion.getVersionVector().getNextVersion()); |
| VersionTagHolder holder = new VersionTagHolder(tag); |
| ClientProxyMembershipID id = ClientProxyMembershipID |
| .getNewProxyMembership(CCRegion.getDistributionManager().getSystem()); |
| CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, holder, true); |
| vm0.invoke("check conflation count", () -> { |
| // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false |
| // ARM.updateEntry will be called one more time, so there will be 2 conflicted events |
| assertThat(CCRegion.getCachePerfStats().getConflatedEventsCount()) |
| .describedAs("conflated event count") |
| .isEqualTo(2); |
| }); |
| } finally { |
| disconnectAllFromDS(); |
| } |
| } |
| |
| |
| |
| } |