| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache30;
|
|
|
| import java.util.Map;
|
| import java.util.Properties;
|
| |
| import com.gemstone.gemfire.cache.AttributesFactory;
|
| import com.gemstone.gemfire.cache.CacheException;
|
| import com.gemstone.gemfire.cache.CacheListener;
|
| import com.gemstone.gemfire.cache.DataPolicy;
|
| import com.gemstone.gemfire.cache.EntryEvent;
|
| import com.gemstone.gemfire.cache.EntryNotFoundException;
|
| import com.gemstone.gemfire.cache.RegionAttributes;
|
| import com.gemstone.gemfire.cache.RegionFactory;
|
| import com.gemstone.gemfire.cache.RegionShortcut;
|
| import com.gemstone.gemfire.cache.Scope;
|
| import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
|
| import com.gemstone.gemfire.distributed.internal.DistributionConfig;
|
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.internal.cache.LocalRegion;
|
| |
| import dunit.AsyncInvocation;
|
| import dunit.Host;
|
| import dunit.SerializableRunnable;
|
| import dunit.VM;
|
|
|
| public class DistributedNoAckRegionCCEDUnitTest extends
|
| DistributedNoAckRegionDUnitTest {
|
|
|
| static volatile boolean ListenerBlocking;
|
|
|
| public DistributedNoAckRegionCCEDUnitTest(String name) {
|
| super(name);
|
| }
|
|
|
| @Override
|
| public Properties getDistributedSystemProperties() {
|
| Properties p = super.getDistributedSystemProperties();
|
| p.put(DistributionConfig.CONSERVE_SOCKETS_NAME, "false");
|
| if (distributedSystemID > 0) {
|
| p.put(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+distributedSystemID);
|
| }
|
| p.put(DistributionConfig.SOCKET_BUFFER_SIZE_NAME, ""+2000000);
|
| return p;
|
| }
|
|
|
|
|
| /**
|
| * Returns region attributes for a <code>GLOBAL</code> region
|
| */
|
| protected RegionAttributes getRegionAttributes() {
|
| AttributesFactory factory = new AttributesFactory();
|
| factory.setScope(Scope.DISTRIBUTED_NO_ACK);
|
| factory.setDataPolicy(DataPolicy.REPLICATE);
|
| factory.setConcurrencyChecksEnabled(true);
|
| return factory.create();
|
| }
|
| protected RegionAttributes getRegionAttributes(String type) {
|
| RegionAttributes ra = getCache().getRegionAttributes(type);
|
| if (ra == null) {
|
| throw new IllegalStateException("The region shortcut " + type
|
| + " has been removed.");
|
| }
|
| AttributesFactory factory = new AttributesFactory(ra);
|
| factory.setScope(Scope.DISTRIBUTED_NO_ACK);
|
| factory.setConcurrencyChecksEnabled(true);
|
| return factory.create();
|
| }
|
|
|
| @Override
|
| public void sendSerialMessageToAll() {
|
| try {
|
| com.gemstone.gemfire.distributed.internal.SerialAckedMessage msg = new com.gemstone.gemfire.distributed.internal.SerialAckedMessage();
|
| msg.send(InternalDistributedSystem.getConnectedInstance().getDM().getNormalDistributionManagerIds(), false);
|
| }
|
| catch (Exception e) {
|
| throw new RuntimeException("Unable to send serial message due to exception", e);
|
| }
|
| }
|
|
|
|
|
| @Override
|
| public void testLocalDestroy() throws InterruptedException {
|
| // replicates don't allow local destroy
|
| }
|
|
|
| @Override
|
| public void testEntryTtlLocalDestroy() throws InterruptedException {
|
| // replicates don't allow local destroy
|
| }
|
|
|
| public void testClearWithManyEventsInFlight() throws Exception {
|
| Host host = Host.getHost(0);
|
| VM vm0 = host.getVM(0);
|
| VM vm1 = host.getVM(1);
|
| VM vm2 = host.getVM(2);
|
| VM vm3 = host.getVM(3);
|
|
|
| // create replicated regions in VM 0 and 1, then perform concurrent ops
|
| // on the same key while creating the region in VM2. Afterward make
|
| // sure that all three regions are consistent
|
|
|
| final String name = this.getUniqueName() + "-CC";
|
| createRegionWithAttribute(vm0, name, false);
|
| createRegionWithAttribute(vm1, name, false);
|
| createRegionWithAttribute(vm2, name, false);
|
| createRegionWithAttribute(vm3, name, false);
|
| vm0.invoke(DistributedNoAckRegionCCEDUnitTest.class, "addBlockingListener");
|
| vm1.invoke(DistributedNoAckRegionCCEDUnitTest.class, "addBlockingListener");
|
| vm2.invoke(DistributedNoAckRegionCCEDUnitTest.class, "addBlockingListener");
|
| AsyncInvocation vm0Ops = vm0.invokeAsync(DistributedNoAckRegionCCEDUnitTest.class, "doManyOps");
|
| AsyncInvocation vm1Ops = vm1.invokeAsync(DistributedNoAckRegionCCEDUnitTest.class, "doManyOps");
|
| AsyncInvocation vm2Ops = vm2.invokeAsync(DistributedNoAckRegionCCEDUnitTest.class, "doManyOps");
|
| // pause to let a bunch of operations build up
|
| pause(5000);
|
| AsyncInvocation a0 = vm3.invokeAsync(DistributedNoAckRegionCCEDUnitTest.class, "clearRegion");
|
| vm0.invoke(DistributedNoAckRegionCCEDUnitTest.class, "unblockListener");
|
| vm1.invoke(DistributedNoAckRegionCCEDUnitTest.class, "unblockListener");
|
| vm2.invoke(DistributedNoAckRegionCCEDUnitTest.class, "unblockListener");
|
| waitForAsyncProcessing(a0, "");
|
| waitForAsyncProcessing(vm0Ops, "");
|
| waitForAsyncProcessing(vm1Ops, "");
|
| waitForAsyncProcessing(vm2Ops, "");
|
|
|
| // if (a0failed && a1failed) {
|
| // fail("neither member saw event conflation - check stats for " + name);
|
| // }
|
|
|
| // check consistency of the regions
|
| Map r0Contents = (Map)vm0.invoke(this.getClass(), "getCCRegionContents");
|
| Map r1Contents = (Map)vm1.invoke(this.getClass(), "getCCRegionContents");
|
| Map r2Contents = (Map)vm2.invoke(this.getClass(), "getCCRegionContents");
|
| Map r3Contents = (Map)vm3.invoke(this.getClass(), "getCCRegionContents");
|
|
|
| for (int i=0; i<10; i++) {
|
| String key = "cckey" + i;
|
| assertEquals("region contents are not consistent", r0Contents.get(key), r1Contents.get(key));
|
| assertEquals("region contents are not consistent", r1Contents.get(key), r2Contents.get(key));
|
| assertEquals("region contents are not consistent", r2Contents.get(key), r3Contents.get(key));
|
| for (int subi=1; subi<3; subi++) {
|
| String subkey = key + "-" + subi;
|
| assertEquals("region contents are not consistent", r0Contents.get(subkey), r1Contents.get(subkey));
|
| assertEquals("region contents are not consistent", r1Contents.get(subkey), r2Contents.get(subkey));
|
| assertEquals("region contents are not consistent", r2Contents.get(subkey), r3Contents.get(subkey));
|
| }
|
| }
|
| }
|
|
|
| static void addBlockingListener() {
|
| ListenerBlocking = true;
|
| CCRegion.getAttributesMutator().addCacheListener(new CacheListenerAdapter(){
|
| public void afterCreate(EntryEvent event) {
|
| onEvent(event);
|
| }
|
| private void onEvent(EntryEvent event) {
|
| boolean blocked = false;
|
| if (event.isOriginRemote()) {
|
| synchronized(this) {
|
| while (ListenerBlocking) {
|
| getLogWriter().info("blocking cache operations for " + event.getDistributedMember());
|
| blocked = true;
|
| try {
|
| wait();
|
| } catch (InterruptedException e) {
|
| Thread.currentThread().interrupt();
|
| getLogWriter().info("blocking cache listener interrupted");
|
| return;
|
| }
|
| }
|
| }
|
| if (blocked) {
|
| getLogWriter().info("allowing cache operations for " + event.getDistributedMember());
|
| }
|
| }
|
| }
|
| @Override
|
| public void close() {
|
| getLogWriter().info("closing blocking listener");
|
| ListenerBlocking = false;
|
| synchronized(this) {
|
| notifyAll();
|
| }
|
| }
|
| @Override
|
| public void afterUpdate(EntryEvent event) {
|
| onEvent(event);
|
| }
|
| @Override
|
| public void afterInvalidate(EntryEvent event) {
|
| onEvent(event);
|
| }
|
| @Override
|
| public void afterDestroy(EntryEvent event) {
|
| onEvent(event);
|
| }
|
| });
|
| }
|
|
|
| static void doManyOps() {
|
| // do not include putAll, which requires an Ack to detect failures
|
| doOpsLoopNoFlush(5000, false, false);
|
| }
|
|
|
| static void unblockListener() {
|
| CacheListener listener = CCRegion.getCacheListener();
|
| ListenerBlocking = false;
|
| synchronized(listener) {
|
| listener.notifyAll();
|
| }
|
| }
|
|
|
| static void clearRegion() {
|
| CCRegion.clear();
|
| }
|
|
|
| /**
|
| * This test creates a server cache in vm0 and a peer cache in vm1.
|
| * It then tests to see if GII transferred tombstones to vm1 like it's supposed to.
|
| * A client cache is created in vm2 and the same sort of check is performed
|
| * for register-interest.
|
| */
|
|
|
| public void testGIISendsTombstones() throws Exception {
|
| versionTestGIISendsTombstones();
|
| }
|
|
|
|
|
| protected void do_version_recovery_if_necessary(final VM vm0, final VM vm1, final VM vm2, final Object[] params) {
|
| // do nothing here
|
| }
|
|
|
| /**
|
| * This tests the concurrency versioning system to ensure that event conflation
|
| * happens correctly and that the statistic is being updated properly
|
| */
|
| public void testConcurrentEvents() throws Exception {
|
| versionTestConcurrentEvents();
|
| }
|
|
|
|
|
| public void testClearWithConcurrentEvents() throws Exception {
|
| // need to figure out how to flush clear() ops for verification steps
|
| }
|
|
|
| public void testClearWithConcurrentEventsAsync() throws Exception {
|
| // need to figure out how to flush clear() ops for verification steps
|
| }
|
|
|
| public void testClearOnNonReplicateWithConcurrentEvents() throws Exception {
|
| // need to figure out how to flush clear() ops for verification steps
|
| }
|
|
|
|
|
| public void testTombstones() {
|
| versionTestTombstones();
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| public void testOneHopKnownIssues() {
|
| Host host = Host.getHost(0);
|
| VM vm0 = host.getVM(0);
|
| VM vm1 = host.getVM(1);
|
| VM vm2 = host.getVM(2);
|
| VM vm3 = host.getVM(3); // this VM, but treat as a remote for uniformity
|
|
|
| // create an empty region in vm0 and replicated regions in VM 1 and 3,
|
| // then perform concurrent ops
|
| // on the same key while creating the region in VM2. Afterward make
|
| // sure that all three regions are consistent
|
|
|
| final String name = this.getUniqueName() + "-CC";
|
| SerializableRunnable createRegion = new SerializableRunnable("Create Region") {
|
| public void run() {
|
| try {
|
| final RegionFactory f;
|
| int vmNumber = VM.getCurrentVMNum();
|
| switch (vmNumber) {
|
| case 0:
|
| f = getCache().createRegionFactory(getRegionAttributes(RegionShortcut.REPLICATE_PROXY.toString()));
|
| break;
|
| case 1:
|
| f = getCache().createRegionFactory(getRegionAttributes(RegionShortcut.REPLICATE.toString()));
|
| f.setDataPolicy(DataPolicy.NORMAL);
|
| break;
|
| default:
|
| f = getCache().createRegionFactory(getRegionAttributes());
|
| break;
|
| }
|
| CCRegion = (LocalRegion)f.create(name);
|
| } catch (CacheException ex) {
|
| fail("While creating region", ex);
|
| }
|
| }
|
| };
|
|
|
| vm0.invoke(createRegion); // empty
|
| vm1.invoke(createRegion); // normal
|
| vm2.invoke(createRegion); // replicate
|
|
|
| // case 1: entry already invalid on vm2 (replicate) is invalidated by vm0 (empty)
|
| final String invalidationKey = "invalidationKey";
|
| final String destroyKey = "destroyKey";
|
| SerializableRunnable test = new SerializableRunnable("case 1: second invalidation not applied or distributed") {
|
| public void run() {
|
| CCRegion.put(invalidationKey, "initialValue");
|
|
|
| int invalidationCount = CCRegion.getCachePerfStats().getInvalidates();
|
| CCRegion.invalidate(invalidationKey);
|
| CCRegion.invalidate(invalidationKey);
|
| assertEquals(invalidationCount+1, CCRegion.getCachePerfStats().getInvalidates());
|
|
|
| // also test destroy() while we're at it. It should throw an exception
|
| int destroyCount = CCRegion.getCachePerfStats().getDestroys();
|
| CCRegion.destroy(invalidationKey);
|
| try {
|
| CCRegion.destroy(invalidationKey);
|
| fail("expected an EntryNotFoundException");
|
| } catch (EntryNotFoundException e) {
|
| // expected
|
| }
|
| assertEquals(destroyCount+1, CCRegion.getCachePerfStats().getDestroys());
|
| }
|
| };
|
| vm0.invoke(test);
|
|
|
| // now do the same with the datapolicy=normal region
|
| test.setName("case 2: second invalidation not applied or distributed");
|
| vm1.invoke(test);
|
| }
|
|
|
| /**
|
| * This tests the concurrency versioning system to ensure that event conflation
|
| * happens correctly and that the statistic is being updated properly
|
| */
|
| public void testConcurrentEventsOnEmptyRegion() {
|
| versionTestConcurrentEventsOnEmptyRegion();
|
| }
|
|
|
|
|
|
|
|
|
| /**
|
| * This tests the concurrency versioning system to ensure that event conflation
|
| * happens correctly and that the statistic is being updated properly
|
| */
|
| public void testConcurrentEventsOnNonReplicatedRegion() {
|
| versionTestConcurrentEventsOnNonReplicatedRegion();
|
| }
|
|
|
|
|
| public void testGetAllWithVersions() {
|
| versionTestGetAllWithVersions();
|
| }
|
|
|
|
|
| public void testRegionVersionVectors() throws Exception {
|
| versionTestRegionVersionVectors();
|
| }
|
|
|
|
|
|
|
| // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
| // these methods can be uncommented to inhibit test execution
|
| // when new tests are added
|
| // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
| // @Override
|
| // public void testNonblockingGetInitialImage() throws Throwable {
|
| // }
|
| // @Override
|
| // public void testConcurrentOperations() throws Exception {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedUpdate() {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedGet() {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedPutNoUpdate() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testDefinedEntryUpdated() {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedDestroy() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedRegionDestroy() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testLocalRegionDestroy() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedInvalidate() {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedInvalidate4() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedRegionInvalidate() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheListener() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheListenerInSubregion() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheLoader() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheLoaderArg() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheLoaderException() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testCacheLoaderWithNetSearch() throws CacheException {
|
| // }
|
| //
|
| // @Override
|
| // public void testCacheLoaderWithNetLoad() throws CacheException {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoRemoteCacheLoader() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoLoaderWithInvalidEntry() {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteCacheWriter() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testLocalAndRemoteCacheWriters() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testCacheLoaderModifyingArgument() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testRemoteLoaderNetSearch() throws CacheException {
|
| // }
|
| //
|
| // @Override
|
| // public void testLocalCacheLoader() {
|
| // }
|
| //
|
| // @Override
|
| // public void testDistributedPut() throws Exception {
|
| // }
|
| //
|
| // @Override
|
| // public void testReplicate() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testDeltaWithReplicate() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testGetInitialImage() {
|
| // }
|
| //
|
| // @Override
|
| // public void testLargeGetInitialImage() {
|
| // }
|
| //
|
| // @Override
|
| // public void testMirroredDataFromNonMirrored() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoMirroredDataToNonMirrored() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testMirroredLocalLoad() {
|
| // }
|
| //
|
| // @Override
|
| // public void testMirroredNetLoad() {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoRegionKeepAlive() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testNetSearchObservesTtl() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testNetSearchObservesIdleTime() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testEntryTtlDestroyEvent() throws InterruptedException {
|
| // }
|
| //
|
| // @Override
|
| // public void testUpdateResetsIdleTime() throws InterruptedException {
|
| // }
|
| // @Override
|
| // public void testTXNonblockingGetInitialImage() throws Throwable {
|
| // }
|
| //
|
| // @Override
|
| // public void testNBRegionInvalidationDuringGetInitialImage() throws Throwable {
|
| // }
|
| //
|
| // @Override
|
| // public void testNBRegionDestructionDuringGetInitialImage() throws Throwable {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoDataSerializer() {
|
| // }
|
| //
|
| // @Override
|
| // public void testNoInstantiator() {
|
| // }
|
| //
|
| // @Override
|
| // public void testTXSimpleOps() throws Exception {
|
| // }
|
| //
|
| // @Override
|
| // public void testTXUpdateLoadNoConflict() throws Exception {
|
| // }
|
| //
|
| // @Override
|
| // public void testTXMultiRegion() throws Exception {
|
| // }
|
| //
|
| // @Override
|
| // public void testTXRmtMirror() throws Exception {
|
| // }
|
|
|
|
|
| }
|