| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.fail; |
| |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.junit.Test; |
| |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.DistributionMessageObserver; |
| import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| |
| /** |
| * Tests interrupting gemfire threads during a put operation to see what happens |
| * |
| */ |
| |
| public class InterruptsDUnitTest extends JUnit4CacheTestCase { |
| |
| private static volatile Thread puttingThread; |
| private static final long MAX_WAIT = 60 * 1000; |
| private static AtomicBoolean doInterrupt = new AtomicBoolean(false); |
| |
| public InterruptsDUnitTest() { |
| super(); |
| } |
| |
| |
| |
| @Override |
| public final void preTearDownCacheTestCase() throws Exception { |
| Invoke.invokeInEveryVM(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| puttingThread = null; |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * A simple test case that we are actually persisting with a PR. |
| * |
| */ |
| @Test |
| public void testDRPutWithInterrupt() throws Throwable { |
| Host host = Host.getHost(0); |
| final VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| createRegion(vm0); |
| |
| // put some data in vm0 |
| createData(vm0, 0, 10, "a"); |
| |
| final SerializableCallable interruptTask = new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| puttingThread.interrupt(); |
| return null; |
| } |
| }; |
| |
| vm1.invoke(new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| disconnectFromDS(); |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| |
| @Override |
| public void beforeProcessMessage(ClusterDistributionManager dm, |
| DistributionMessage message) { |
| if (message instanceof UpdateMessage |
| && ((UpdateMessage) message).regionPath.contains("region") |
| && doInterrupt.compareAndSet(true, false)) { |
| vm0.invoke(interruptTask); |
| DistributionMessageObserver.setInstance(null); |
| } |
| } |
| |
| }); |
| return null; |
| } |
| }); |
| |
| createRegion(vm1); |
| |
| SerializableCallable doPuts = new SerializableCallable() { |
| |
| @Override |
| public Object call() throws Exception { |
| puttingThread = Thread.currentThread(); |
| Region<Object, Object> region = getCache().getRegion("region"); |
| long value = 0; |
| long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT); |
| while (!Thread.currentThread().isInterrupted()) { |
| region.put(0, value); |
| if (System.nanoTime() > end) { |
| fail("Did not get interrupted in 60 seconds"); |
| } |
| } |
| return null; |
| } |
| }; |
| |
| AsyncInvocation async0 = vm0.invokeAsync(doPuts); |
| |
| vm1.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| doInterrupt.set(true); |
| return null; |
| } |
| }); |
| |
| // vm0.invoke(new SerializableCallable() { |
| // |
| // @Override |
| // public Object call() throws Exception { |
| // long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(MAX_WAIT); |
| // while(puttingThread == null) { |
| // Thread.sleep(50); |
| // if(System.nanoTime() > end) { |
| // fail("Putting thread not set in 60 seconds"); |
| // } |
| // } |
| // |
| // puttingThread.interrupt(); |
| // return null; |
| // } |
| // }); |
| |
| async0.getResult(); |
| |
| Object value0 = checkCacheAndGetValue(vm0); |
| Object value1 = checkCacheAndGetValue(vm1); |
| |
| assertEquals(value0, value1); |
| |
| } |
| |
| private Object checkCacheAndGetValue(VM vm) { |
| return vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| assertFalse(basicGetCache().isClosed()); |
| Region<Object, Object> region = getCache().getRegion("region"); |
| return region.get(0); |
| } |
| }); |
| } |
| |
| |
| |
| private void createRegion(VM vm) { |
| vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| getCache().createRegionFactory(RegionShortcut.REPLICATE).create("region"); |
| return null; |
| } |
| }); |
| } |
| |
| private void createData(VM vm, final int start, final int end, final String value) { |
| vm.invoke(new SerializableCallable() { |
| @Override |
| public Object call() throws Exception { |
| Region<Object, Object> region = getCache().getRegion("region"); |
| for (int i = start; i < end; i++) { |
| region.put(i, value); |
| } |
| return null; |
| } |
| }); |
| } |
| } |