GEODE-8650: Support multiple instances of DistributedReference (#5664)
* GEODE-8650: Support multiple instances of DistributedReference
Includes additional improvements for usage of DistributedReference
in tests.
* * Rename all CompletionUtils to close
* Annotate all overridden methods
* Suppress false warnings
diff --git a/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java b/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java
new file mode 100644
index 0000000..ed3eecc
--- /dev/null
+++ b/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.util.internal;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collection of utilities for changing simple Java util structures to a state of completion or
+ * default.
+ *
+ * <p>
+ * All utilities herein throw checked exceptions wrapped within a runtime exception.
+ */
+public class CompletionUtils {
+
+ /**
+ * Closes the {@code AutoCloseable}.
+ */
+ public static void close(AutoCloseable autoCloseable) {
+ try {
+ autoCloseable.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Opens the {@code CountDownLatch} by counting it down.
+ */
+ public static void close(CountDownLatch countDownLatch) {
+ while (countDownLatch.getCount() > 0) {
+ countDownLatch.countDown();
+ }
+ }
+
+ /**
+ * Sets the {@code AtomicBoolean} to false.
+ */
+ public static void close(AtomicBoolean atomicBoolean) {
+ atomicBoolean.set(false);
+ }
+
+ /**
+ * Sets the {@code AtomicReference} to null.
+ */
+ public static void close(AtomicReference<?> atomicReference) {
+ atomicReference.set(null);
+ }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
index efdbed0..817a03e 100644
--- a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
@@ -29,6 +29,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
@@ -212,6 +214,69 @@
}
}
+ @Test
+ public void accessesAtomicBooleanInEachVm() {
+ runTestWithValidation(SetAtomicBooleanInLocalVm.class);
+ }
+
+ @Test
+ public void setsAtomicBooleanToFalseInEachVm() {
+ runTestWithValidation(SetAtomicBooleanInLocalVm.class);
+
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(SetAtomicBooleanInLocalVm.atomicBoolean.get()).isFalse();
+ });
+ }
+ }
+
+ @Test
+ public void accessesCountDownLatchInEachVm() {
+ runTestWithValidation(SetCountDownLatchInLocalVm.class);
+ }
+
+ @Test
+ public void opensCountDownLatchInEachVm() {
+ runTestWithValidation(SetCountDownLatchInLocalVm.class);
+
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(SetCountDownLatchInLocalVm.latch.get().getCount()).isZero();
+ });
+ }
+ }
+
+ @Test
+ public void accessesTwoReferencesInEachVm() {
+ runTestWithValidation(SetTwoCloseablesInEachVm.class);
+ }
+
+ @Test
+ public void closesTwoReferencesInEachVm() {
+ runTestWithValidation(SetTwoCloseablesInEachVm.class);
+
+ getController().invoke(() -> {
+ verify(SetTwoCloseablesInEachVm.withClose1.get()).close();
+ verify(SetTwoCloseablesInEachVm.withClose2.get()).close();
+ });
+ }
+
+ @Test
+ public void accessesManyReferencesInEachVm() {
+ runTestWithValidation(SetManyReferencesInEachVm.class);
+ }
+
+ @Test
+ public void closesManyReferencesInEachVm() {
+ runTestWithValidation(SetManyReferencesInEachVm.class);
+
+ getController().invoke(() -> {
+ verify(SetManyReferencesInEachVm.withClose.get()).close();
+ verify(SetManyReferencesInEachVm.withDisconnect.get()).disconnect();
+ verify(SetManyReferencesInEachVm.withStop.get()).stop();
+ });
+ }
+
public static class SetAutoCloseableInLocalVm implements Serializable {
private static final AtomicReference<AutoCloseable> autoCloseable = new AtomicReference<>();
@@ -587,6 +652,145 @@
}
}
+ public static class SetAtomicBooleanInLocalVm implements Serializable {
+
+ private static final AtomicReference<AtomicBoolean> atomicBoolean = new AtomicReference<>();
+
+ @Rule
+ public DistributedReference<AtomicBoolean> reference = new DistributedReference<>();
+
+ @Before
+ public void setUp() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ atomicBoolean.set(new AtomicBoolean(true));
+ reference.set(atomicBoolean.get());
+ });
+ }
+ }
+
+ @Test
+ public void hasAtomicBooleanInEachVm() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(reference.get())
+ .isSameAs(atomicBoolean.get())
+ .isTrue();
+ });
+ }
+ }
+ }
+
+ public static class SetCountDownLatchInLocalVm implements Serializable {
+
+ private static final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+
+ @Rule
+ public DistributedReference<CountDownLatch> reference = new DistributedReference<>();
+
+ @Before
+ public void setUp() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ latch.set(new CountDownLatch(2));
+ reference.set(latch.get());
+ });
+ }
+ }
+
+ @Test
+ public void hasReferenceInLocalVm() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(reference.get()).isSameAs(latch.get());
+ assertThat(latch.get().getCount()).isEqualTo(2);
+ });
+ }
+ }
+ }
+
+ public static class SetTwoCloseablesInEachVm implements Serializable {
+
+ private static final AtomicReference<WithClose> withClose1 = new AtomicReference<>();
+ private static final AtomicReference<WithClose> withClose2 = new AtomicReference<>();
+
+ @Rule
+ public DistributedReference<WithClose> reference1 = new DistributedReference<>();
+ @Rule
+ public DistributedReference<WithClose> reference2 = new DistributedReference<>();
+
+ @Before
+ public void setUp() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ withClose1.set(spy(new WithClose("WithClose1 in VM-" + vm.getId())));
+ reference1.set(withClose1.get());
+
+ withClose2.set(spy(new WithClose("WithClose2 in VM-" + vm.getId())));
+ reference2.set(withClose2.get());
+ });
+ }
+ }
+
+ @Test
+ public void hasTwoWithCloseInEachVm() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(reference1.get()).isSameAs(withClose1.get());
+ assertThat(reference2.get()).isSameAs(withClose2.get());
+
+ assertThat(reference1.get().toString()).isEqualTo("WithClose1 in VM-" + vm.getId());
+ assertThat(reference2.get().toString()).isEqualTo("WithClose2 in VM-" + vm.getId());
+ });
+ }
+ }
+ }
+
+ public static class SetManyReferencesInEachVm implements Serializable {
+
+ private static final AtomicReference<WithClose> withClose = new AtomicReference<>();
+ private static final AtomicReference<WithDisconnect> withDisconnect = new AtomicReference<>();
+ private static final AtomicReference<WithStop> withStop = new AtomicReference<>();
+
+ @Rule
+ public DistributedReference<WithClose> refWithClose = new DistributedReference<>();
+ @Rule
+ public DistributedReference<WithDisconnect> refWithDisconnect = new DistributedReference<>();
+ @Rule
+ public DistributedReference<WithStop> refWithStop = new DistributedReference<>();
+
+ @Before
+ public void setUp() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ withClose.set(spy(new WithClose("WithClose in VM-" + vm.getId())));
+ withDisconnect.set(spy(new WithDisconnect("WithDisconnect in VM-" + vm.getId())));
+ withStop.set(spy(new WithStop("WithStop in VM-" + vm.getId())));
+
+ refWithClose.set(withClose.get());
+ refWithDisconnect.set(withDisconnect.get());
+ refWithStop.set(withStop.get());
+ });
+ }
+ }
+
+ @Test
+ public void hasManyReferencesInEachVm() {
+ for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+ vm.invoke(() -> {
+ assertThat(refWithClose.get()).isSameAs(withClose.get());
+ assertThat(refWithDisconnect.get()).isSameAs(withDisconnect.get());
+ assertThat(refWithStop.get()).isSameAs(withStop.get());
+
+ assertThat(refWithClose.get().toString()).isEqualTo("WithClose in VM-" + vm.getId());
+ assertThat(refWithDisconnect.get().toString())
+ .isEqualTo("WithDisconnect in VM-" + vm.getId());
+ assertThat(refWithStop.get().toString()).isEqualTo("WithStop in VM-" + vm.getId());
+ });
+ }
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
public static class WithClose {
@@ -633,7 +837,7 @@
}
}
- @SuppressWarnings("unused")
+ @SuppressWarnings({"unused", "WeakerAccess"})
public static class WithStop {
private final String value;
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
index e140e5d..5d99ddb 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
@@ -533,8 +533,9 @@
*
* @throws RMIException if an exception occurs while bouncing this {@code VM}
*/
- public void bounce() {
+ public VM bounce() {
bounce(version, false);
+ return this;
}
/**
@@ -550,12 +551,14 @@
*
* @throws RMIException if an exception occurs while bouncing this {@code VM}
*/
- public void bounceForcibly() {
+ public VM bounceForcibly() {
bounce(version, true);
+ return this;
}
- public void bounce(final String targetVersion) {
+ public VM bounce(final String targetVersion) {
bounce(targetVersion, false);
+ return this;
}
private synchronized void bounce(final String targetVersion, boolean force) {
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
index 9e15293..c2422d1 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
@@ -17,14 +17,20 @@
package org.apache.geode.test.dunit.rules;
import static org.apache.geode.test.dunit.VM.DEFAULT_VM_COUNT;
+import static org.apache.geode.util.internal.CompletionUtils.close;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.geode.test.dunit.VM;
+
/**
* DistributedReference is a JUnit Rule that provides automated tearDown for a static
* reference in every distributed test {@code VM}s including the main JUnit controller {@code VM}.
@@ -149,12 +155,14 @@
* The {@code DistributedReference} value will still be set to null during tear down even
* if auto-closing is disabled.
*/
-@SuppressWarnings({"serial", "unused", "WeakerAccess"})
+@SuppressWarnings({"serial", "unused", "WeakerAccess",
+ "OverloadedMethodsWithSameNumberOfParameters"})
public class DistributedReference<V> extends AbstractDistributedRule {
- private static final AtomicReference<Object> REFERENCE = new AtomicReference<>();
+ private static final AtomicReference<Map<Integer, Object>> REFERENCE = new AtomicReference<>();
private final AtomicBoolean autoClose = new AtomicBoolean(true);
+ private final int identity;
public DistributedReference() {
this(DEFAULT_VM_COUNT);
@@ -162,6 +170,7 @@
public DistributedReference(int vmCount) {
super(vmCount);
+ identity = hashCode();
}
/**
@@ -178,7 +187,7 @@
* @return the current value
*/
public V get() {
- return uncheckedCast(REFERENCE.get());
+ return uncheckedCast(REFERENCE.get().get(identity));
}
/**
@@ -187,47 +196,80 @@
* @param newValue the new value
*/
public DistributedReference<V> set(V newValue) {
- REFERENCE.set(newValue);
+ REFERENCE.get().put(identity, newValue);
return this;
}
@Override
+ protected void before() {
+ invoker().invokeInEveryVMAndController(() -> invokeBefore());
+ }
+
+ @Override
protected void after() {
- invoker().invokeInEveryVMAndController(this::invokeAfter);
+ invoker().invokeInEveryVMAndController(() -> invokeAfter());
+ }
+
+ @Override
+ protected void afterCreateVM(VM vm) {
+ vm.invoke(() -> invokeBefore());
+ }
+
+ @Override
+ @SuppressWarnings("RedundantMethodOverride")
+ protected void beforeBounceVM(VM vm) {
+ // override if needed
+ }
+
+ @Override
+ protected void afterBounceVM(VM vm) {
+ vm.invoke(() -> invokeBefore());
+ }
+
+ private void invokeBefore() {
+ REFERENCE.compareAndSet(null, new HashMap<>());
+ REFERENCE.get().putIfAbsent(identity, null);
}
private void invokeAfter() {
- V value = get();
- if (value == null) {
+ Map<Integer, Object> references = REFERENCE.getAndSet(null);
+ if (references == null) {
return;
}
- REFERENCE.set(null);
+
+ for (Object object : references.values()) {
+ invokeAfter(object);
+ }
+ }
+
+ private void invokeAfter(Object object) {
+ if (object == null) {
+ return;
+ }
if (autoClose.get()) {
- autoClose(value);
+ autoClose(object);
}
}
- private void autoClose(V value) {
- if (value instanceof AutoCloseable) {
- close((AutoCloseable) value);
+ private void autoClose(Object object) {
+ if (object instanceof AutoCloseable) {
+ close((AutoCloseable) object);
- } else if (hasMethod(value.getClass(), "close")) {
- invokeMethod(value, "close");
+ } else if (object instanceof AtomicBoolean) {
+ close((AtomicBoolean) object);
- } else if (hasMethod(value.getClass(), "disconnect")) {
- invokeMethod(value, "disconnect");
+ } else if (object instanceof CountDownLatch) {
+ close((CountDownLatch) object);
- } else if (hasMethod(value.getClass(), "stop")) {
- invokeMethod(value, "stop");
- }
- }
+ } else if (hasMethod(object.getClass(), "close")) {
+ invokeMethod(object, "close");
- private static void close(AutoCloseable autoCloseable) {
- try {
- autoCloseable.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } else if (hasMethod(object.getClass(), "disconnect")) {
+ invokeMethod(object, "disconnect");
+
+ } else if (hasMethod(object.getClass(), "stop")) {
+ invokeMethod(object, "stop");
}
}