GEODE-9990: turn DiskAccessException into CacheClosedException (#7334)

* GEODE-9990: turn DiskAccessException into CacheClosedException

- when DiskInitFile is in closed state and DiskStoreImpl is closed or
  closing
- catch DiskAccessException in PRHARedundancyProvider and turn into
  CacheClosedException if cache closing is in progress
- change CreateBucketMessage to handle DiskAccessException as cause of
  ReplyException
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
index 927616d..56b3e55 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
@@ -15,10 +15,15 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -31,8 +36,11 @@
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.Statistics;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
 import org.apache.geode.internal.cache.persistence.DiskStoreID;
 
@@ -134,4 +142,68 @@
     assertThat(dif.hasKrf(2)).isFalse();
     dif.destroy();
   }
+
+  @Test
+  public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() {
+    markInitializedTestSetup();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+        DiskAccessException.class);
+  }
+
+  @Test
+  public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() {
+    markInitializedTestSetup();
+    when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE);
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+        CacheClosedException.class);
+  }
+
+  @Test
+  public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() {
+    CancelCriterion cancelCriterion = markInitializedTestSetup();
+    CacheClosedException cacheClosedException = new CacheClosedException("boom");
+    doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo(
+        cacheClosedException);
+  }
+
+  @Test
+  public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() {
+    markInitializedTestSetup();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView))
+        .isInstanceOf(DiskAccessException.class);
+    verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class));
+  }
+
+  private CancelCriterion markInitializedTestSetup() {
+    InternalCache internalCache = mock(InternalCache.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+
+    when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache);
+    when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion);
+    when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion);
+
+    return cancelCriterion;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
index a9651e5..c6baad2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
@@ -51,6 +51,7 @@
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.Instantiator;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAlgorithm;
@@ -1307,11 +1308,24 @@
     writeIFRecord(bb, true);
   }
 
+  private void checkClosed() {
+    if (closed) {
+      parent.getCache().getCancelCriterion().checkCancelInProgress();
+
+      if (parent.isClosed() || parent.isClosing()) {
+        throw new CacheClosedException("The disk store is closed or closing");
+      }
+
+      DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent);
+      parent.handleDiskAccessException(dae);
+
+      throw dae;
+    }
+  }
+
   private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
     assert lock.isHeldByCurrentThread();
-    if (closed) {
-      throw new DiskAccessException("The disk store is closed", parent);
-    }
+    checkClosed();
 
     ifRAF.write(bb.array(), 0, bb.position());
     if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
@@ -1327,9 +1341,8 @@
 
   private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException {
     assert lock.isHeldByCurrentThread();
-    if (closed) {
-      throw new DiskAccessException("The disk store is closed", parent);
-    }
+    checkClosed();
+
     hdos.sendTo(ifRAF);
     if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
       logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index e07ab55..0e24e0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -43,11 +43,13 @@
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -726,6 +728,14 @@
             return bucketPrimary;
           }
         }
+      } catch (DiskAccessException dae) {
+        CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion();
+        if (cancelCriterion.isCancelInProgress()) {
+          needToElectPrimary = false;
+          cancelCriterion.checkCancelInProgress(dae);
+        }
+
+        throw dae;
       } catch (CancelException | RegionDestroyedException e) {
         // We don't need to elect a primary if the cache was closed. The other members will
         // take care of it. This ensures we don't compromise redundancy.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
index ad9565e..9fcb5b2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
@@ -23,6 +23,7 @@
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -338,9 +339,9 @@
         waitForRepliesUninterruptibly();
       } catch (ReplyException e) {
         Throwable t = e.getCause();
-        if (t instanceof CancelException) {
+        if (t instanceof DiskAccessException || t instanceof CancelException) {
           logger.debug(
-              "NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}",
+              "NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}",
               t.getMessage(), t);
           return null;
         }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index 6cd69b7..7cc089e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -16,16 +16,21 @@
 
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -44,10 +49,13 @@
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
 import org.apache.geode.internal.cache.partitioned.LoadProbe;
 import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
@@ -248,6 +256,70 @@
   }
 
   @Test
+  public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() {
+    String partitionName = "partitionName";
+    DiskAccessException diskAccessException = new DiskAccessException("boom");
+    CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException);
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Bucket bucket = mock(Bucket.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+    when(partitionedRegion.getCache()).thenReturn(internalCache);
+    when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+    when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+    when(internalCache.isCacheAtShutdownAll())
+        .thenReturn(Boolean.FALSE)
+        .thenThrow(diskAccessException);
+    when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE);
+    doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException);
+    when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+        .thenReturn(memberSet);
+
+    assertThatThrownBy(
+        () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+            .isEqualTo(cacheClosedException);
+  }
+
+  @Test
+  public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() {
+    String partitionName = "partitionName";
+    DiskAccessException diskAccessException = new DiskAccessException("boom");
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Bucket bucket = mock(Bucket.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+    when(partitionedRegion.getCache()).thenReturn(internalCache);
+    when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+    when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+    when(internalCache.isCacheAtShutdownAll())
+        .thenReturn(Boolean.FALSE)
+        .thenThrow(diskAccessException);
+    when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE);
+    when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+        .thenReturn(memberSet);
+
+    assertThatThrownBy(
+        () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+            .isEqualTo(diskAccessException);
+  }
+
+  @Test
   @Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"})
   @TestCaseName("{method}[{index}]: {params}")
   public void startTaskCompletesExceptionallyIfExceptionIsThrown(