GEODE-8650: Support multiple instances of DistributedReference (#5664)

* GEODE-8650: Support multiple instances of DistributedReference

Includes additional improvements for usage of DistributedReference
in tests.

* * Rename all CompletionUtils to close
* Annotate all overridden methods
* Suppress false warnings
diff --git a/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java b/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java
new file mode 100644
index 0000000..ed3eecc
--- /dev/null
+++ b/geode-common/src/main/java/org/apache/geode/util/internal/CompletionUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.util.internal;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collection of utilities for changing simple Java util structures to a state of completion or
+ * default.
+ *
+ * <p>
+ * All utilities herein throw checked exceptions wrapped within a runtime exception.
+ */
+public class CompletionUtils {
+
+  /**
+   * Closes the {@code AutoCloseable}.
+   */
+  public static void close(AutoCloseable autoCloseable) {
+    try {
+      autoCloseable.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Opens the {@code CountDownLatch} by counting it down.
+   */
+  public static void close(CountDownLatch countDownLatch) {
+    while (countDownLatch.getCount() > 0) {
+      countDownLatch.countDown();
+    }
+  }
+
+  /**
+   * Sets the {@code AtomicBoolean} to false.
+   */
+  public static void close(AtomicBoolean atomicBoolean) {
+    atomicBoolean.set(false);
+  }
+
+  /**
+   * Sets the {@code AtomicReference} to null.
+   */
+  public static void close(AtomicReference<?> atomicReference) {
+    atomicReference.set(null);
+  }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
index efdbed0..817a03e 100644
--- a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedReferenceTest.java
@@ -29,6 +29,8 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Before;
@@ -212,6 +214,69 @@
     }
   }
 
+  @Test
+  public void accessesAtomicBooleanInEachVm() {
+    runTestWithValidation(SetAtomicBooleanInLocalVm.class);
+  }
+
+  @Test
+  public void setsAtomicBooleanToFalseInEachVm() {
+    runTestWithValidation(SetAtomicBooleanInLocalVm.class);
+
+    for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+      vm.invoke(() -> {
+        assertThat(SetAtomicBooleanInLocalVm.atomicBoolean.get()).isFalse();
+      });
+    }
+  }
+
+  @Test
+  public void accessesCountDownLatchInEachVm() {
+    runTestWithValidation(SetCountDownLatchInLocalVm.class);
+  }
+
+  @Test
+  public void opensCountDownLatchInEachVm() {
+    runTestWithValidation(SetCountDownLatchInLocalVm.class);
+
+    for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+      vm.invoke(() -> {
+        assertThat(SetCountDownLatchInLocalVm.latch.get().getCount()).isZero();
+      });
+    }
+  }
+
+  @Test
+  public void accessesTwoReferencesInEachVm() {
+    runTestWithValidation(SetTwoCloseablesInEachVm.class);
+  }
+
+  @Test
+  public void closesTwoReferencesInEachVm() {
+    runTestWithValidation(SetTwoCloseablesInEachVm.class);
+
+    getController().invoke(() -> {
+      verify(SetTwoCloseablesInEachVm.withClose1.get()).close();
+      verify(SetTwoCloseablesInEachVm.withClose2.get()).close();
+    });
+  }
+
+  @Test
+  public void accessesManyReferencesInEachVm() {
+    runTestWithValidation(SetManyReferencesInEachVm.class);
+  }
+
+  @Test
+  public void closesManyReferencesInEachVm() {
+    runTestWithValidation(SetManyReferencesInEachVm.class);
+
+    getController().invoke(() -> {
+      verify(SetManyReferencesInEachVm.withClose.get()).close();
+      verify(SetManyReferencesInEachVm.withDisconnect.get()).disconnect();
+      verify(SetManyReferencesInEachVm.withStop.get()).stop();
+    });
+  }
+
   public static class SetAutoCloseableInLocalVm implements Serializable {
 
     private static final AtomicReference<AutoCloseable> autoCloseable = new AtomicReference<>();
@@ -587,6 +652,145 @@
     }
   }
 
+  public static class SetAtomicBooleanInLocalVm implements Serializable {
+
+    private static final AtomicReference<AtomicBoolean> atomicBoolean = new AtomicReference<>();
+
+    @Rule
+    public DistributedReference<AtomicBoolean> reference = new DistributedReference<>();
+
+    @Before
+    public void setUp() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          atomicBoolean.set(new AtomicBoolean(true));
+          reference.set(atomicBoolean.get());
+        });
+      }
+    }
+
+    @Test
+    public void hasAtomicBooleanInEachVm() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          assertThat(reference.get())
+              .isSameAs(atomicBoolean.get())
+              .isTrue();
+        });
+      }
+    }
+  }
+
+  public static class SetCountDownLatchInLocalVm implements Serializable {
+
+    private static final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+
+    @Rule
+    public DistributedReference<CountDownLatch> reference = new DistributedReference<>();
+
+    @Before
+    public void setUp() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          latch.set(new CountDownLatch(2));
+          reference.set(latch.get());
+        });
+      }
+    }
+
+    @Test
+    public void hasReferenceInLocalVm() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          assertThat(reference.get()).isSameAs(latch.get());
+          assertThat(latch.get().getCount()).isEqualTo(2);
+        });
+      }
+    }
+  }
+
+  public static class SetTwoCloseablesInEachVm implements Serializable {
+
+    private static final AtomicReference<WithClose> withClose1 = new AtomicReference<>();
+    private static final AtomicReference<WithClose> withClose2 = new AtomicReference<>();
+
+    @Rule
+    public DistributedReference<WithClose> reference1 = new DistributedReference<>();
+    @Rule
+    public DistributedReference<WithClose> reference2 = new DistributedReference<>();
+
+    @Before
+    public void setUp() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          withClose1.set(spy(new WithClose("WithClose1 in VM-" + vm.getId())));
+          reference1.set(withClose1.get());
+
+          withClose2.set(spy(new WithClose("WithClose2 in VM-" + vm.getId())));
+          reference2.set(withClose2.get());
+        });
+      }
+    }
+
+    @Test
+    public void hasTwoWithCloseInEachVm() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          assertThat(reference1.get()).isSameAs(withClose1.get());
+          assertThat(reference2.get()).isSameAs(withClose2.get());
+
+          assertThat(reference1.get().toString()).isEqualTo("WithClose1 in VM-" + vm.getId());
+          assertThat(reference2.get().toString()).isEqualTo("WithClose2 in VM-" + vm.getId());
+        });
+      }
+    }
+  }
+
+  public static class SetManyReferencesInEachVm implements Serializable {
+
+    private static final AtomicReference<WithClose> withClose = new AtomicReference<>();
+    private static final AtomicReference<WithDisconnect> withDisconnect = new AtomicReference<>();
+    private static final AtomicReference<WithStop> withStop = new AtomicReference<>();
+
+    @Rule
+    public DistributedReference<WithClose> refWithClose = new DistributedReference<>();
+    @Rule
+    public DistributedReference<WithDisconnect> refWithDisconnect = new DistributedReference<>();
+    @Rule
+    public DistributedReference<WithStop> refWithStop = new DistributedReference<>();
+
+    @Before
+    public void setUp() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          withClose.set(spy(new WithClose("WithClose in VM-" + vm.getId())));
+          withDisconnect.set(spy(new WithDisconnect("WithDisconnect in VM-" + vm.getId())));
+          withStop.set(spy(new WithStop("WithStop in VM-" + vm.getId())));
+
+          refWithClose.set(withClose.get());
+          refWithDisconnect.set(withDisconnect.get());
+          refWithStop.set(withStop.get());
+        });
+      }
+    }
+
+    @Test
+    public void hasManyReferencesInEachVm() {
+      for (VM vm : asList(getVM(0), getVM(1), getVM(2), getVM(3), getController())) {
+        vm.invoke(() -> {
+          assertThat(refWithClose.get()).isSameAs(withClose.get());
+          assertThat(refWithDisconnect.get()).isSameAs(withDisconnect.get());
+          assertThat(refWithStop.get()).isSameAs(withStop.get());
+
+          assertThat(refWithClose.get().toString()).isEqualTo("WithClose in VM-" + vm.getId());
+          assertThat(refWithDisconnect.get().toString())
+              .isEqualTo("WithDisconnect in VM-" + vm.getId());
+          assertThat(refWithStop.get().toString()).isEqualTo("WithStop in VM-" + vm.getId());
+        });
+      }
+    }
+  }
+
   @SuppressWarnings("WeakerAccess")
   public static class WithClose {
 
@@ -633,7 +837,7 @@
     }
   }
 
-  @SuppressWarnings("unused")
+  @SuppressWarnings({"unused", "WeakerAccess"})
   public static class WithStop {
 
     private final String value;
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
index e140e5d..5d99ddb 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/VM.java
@@ -533,8 +533,9 @@
    *
    * @throws RMIException if an exception occurs while bouncing this {@code VM}
    */
-  public void bounce() {
+  public VM bounce() {
     bounce(version, false);
+    return this;
   }
 
   /**
@@ -550,12 +551,14 @@
    *
    * @throws RMIException if an exception occurs while bouncing this {@code VM}
    */
-  public void bounceForcibly() {
+  public VM bounceForcibly() {
     bounce(version, true);
+    return this;
   }
 
-  public void bounce(final String targetVersion) {
+  public VM bounce(final String targetVersion) {
     bounce(targetVersion, false);
+    return this;
   }
 
   private synchronized void bounce(final String targetVersion, boolean force) {
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
index 9e15293..c2422d1 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedReference.java
@@ -17,14 +17,20 @@
 package org.apache.geode.test.dunit.rules;
 
 import static org.apache.geode.test.dunit.VM.DEFAULT_VM_COUNT;
+import static org.apache.geode.util.internal.CompletionUtils.close;
 import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.geode.test.dunit.VM;
+
 /**
  * DistributedReference is a JUnit Rule that provides automated tearDown for a static
  * reference in every distributed test {@code VM}s including the main JUnit controller {@code VM}.
@@ -149,12 +155,14 @@
  * The {@code DistributedReference} value will still be set to null during tear down even
  * if auto-closing is disabled.
  */
-@SuppressWarnings({"serial", "unused", "WeakerAccess"})
+@SuppressWarnings({"serial", "unused", "WeakerAccess",
+    "OverloadedMethodsWithSameNumberOfParameters"})
 public class DistributedReference<V> extends AbstractDistributedRule {
 
-  private static final AtomicReference<Object> REFERENCE = new AtomicReference<>();
+  private static final AtomicReference<Map<Integer, Object>> REFERENCE = new AtomicReference<>();
 
   private final AtomicBoolean autoClose = new AtomicBoolean(true);
+  private final int identity;
 
   public DistributedReference() {
     this(DEFAULT_VM_COUNT);
@@ -162,6 +170,7 @@
 
   public DistributedReference(int vmCount) {
     super(vmCount);
+    identity = hashCode();
   }
 
   /**
@@ -178,7 +187,7 @@
    * @return the current value
    */
   public V get() {
-    return uncheckedCast(REFERENCE.get());
+    return uncheckedCast(REFERENCE.get().get(identity));
   }
 
   /**
@@ -187,47 +196,80 @@
    * @param newValue the new value
    */
   public DistributedReference<V> set(V newValue) {
-    REFERENCE.set(newValue);
+    REFERENCE.get().put(identity, newValue);
     return this;
   }
 
   @Override
+  protected void before() {
+    invoker().invokeInEveryVMAndController(() -> invokeBefore());
+  }
+
+  @Override
   protected void after() {
-    invoker().invokeInEveryVMAndController(this::invokeAfter);
+    invoker().invokeInEveryVMAndController(() -> invokeAfter());
+  }
+
+  @Override
+  protected void afterCreateVM(VM vm) {
+    vm.invoke(() -> invokeBefore());
+  }
+
+  @Override
+  @SuppressWarnings("RedundantMethodOverride")
+  protected void beforeBounceVM(VM vm) {
+    // override if needed
+  }
+
+  @Override
+  protected void afterBounceVM(VM vm) {
+    vm.invoke(() -> invokeBefore());
+  }
+
+  private void invokeBefore() {
+    REFERENCE.compareAndSet(null, new HashMap<>());
+    REFERENCE.get().putIfAbsent(identity, null);
   }
 
   private void invokeAfter() {
-    V value = get();
-    if (value == null) {
+    Map<Integer, Object> references = REFERENCE.getAndSet(null);
+    if (references == null) {
       return;
     }
-    REFERENCE.set(null);
+
+    for (Object object : references.values()) {
+      invokeAfter(object);
+    }
+  }
+
+  private void invokeAfter(Object object) {
+    if (object == null) {
+      return;
+    }
 
     if (autoClose.get()) {
-      autoClose(value);
+      autoClose(object);
     }
   }
 
-  private void autoClose(V value) {
-    if (value instanceof AutoCloseable) {
-      close((AutoCloseable) value);
+  private void autoClose(Object object) {
+    if (object instanceof AutoCloseable) {
+      close((AutoCloseable) object);
 
-    } else if (hasMethod(value.getClass(), "close")) {
-      invokeMethod(value, "close");
+    } else if (object instanceof AtomicBoolean) {
+      close((AtomicBoolean) object);
 
-    } else if (hasMethod(value.getClass(), "disconnect")) {
-      invokeMethod(value, "disconnect");
+    } else if (object instanceof CountDownLatch) {
+      close((CountDownLatch) object);
 
-    } else if (hasMethod(value.getClass(), "stop")) {
-      invokeMethod(value, "stop");
-    }
-  }
+    } else if (hasMethod(object.getClass(), "close")) {
+      invokeMethod(object, "close");
 
-  private static void close(AutoCloseable autoCloseable) {
-    try {
-      autoCloseable.close();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    } else if (hasMethod(object.getClass(), "disconnect")) {
+      invokeMethod(object, "disconnect");
+
+    } else if (hasMethod(object.getClass(), "stop")) {
+      invokeMethod(object, "stop");
     }
   }