/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.dunit;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;

import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.query.FunctionDomainException;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexInvalidException;
import org.apache.geode.cache.query.NameResolutionException;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
import org.apache.geode.cache.query.QueryExecutionTimeoutException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.control.ResourceListener;
import org.apache.geode.internal.cache.control.TestMemoryThresholdListener;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.OQLQueryTest;

@Category({OQLQueryTest.class})
public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCase {

  private static final Logger logger = LogService.getLogger();

  private static int MAX_TEST_QUERY_TIMEOUT = 4000;
  private static int TEST_QUERY_TIMEOUT = 1000;
  private static final int CRITICAL_HEAP_USED = 950;
  private static final int NORMAL_HEAP_USED = 500;

  @Override
  public final void postSetUpClientServerTestCase() throws Exception {
    Invoke.invokeInEveryVM(this.setHeapMemoryMonitorTestMode);
    IgnoredException.addIgnoredException("above heap critical threshold");
    IgnoredException.addIgnoredException("below heap critical threshold");
  }

  @Override
  protected void preTearDownClientServerTestCase() throws Exception {
    Invoke.invokeInEveryVM(resetQueryMonitor);
    Invoke.invokeInEveryVM(resetResourceManager);
  }

  private SerializableCallable setHeapMemoryMonitorTestMode = new SerializableCallable() {
    @Override
    public Object call() throws Exception {
      HeapMemoryMonitor.setTestDisableMemoryUpdates(true);
      return null;
    }
  };

  private SerializableCallable resetResourceManager = new SerializableCallable() {
    @Override
    public Object call() throws Exception {
      InternalResourceManager irm = ((GemFireCacheImpl) getCache()).getInternalResourceManager();
      // Reset CRITICAL_UP by informing all that heap usage is now 1 byte (0 would disable).
      irm.getHeapMonitor().updateStateAndSendEvent(NORMAL_HEAP_USED, "test");
      Set<ResourceListener> listeners = irm.getResourceListeners(ResourceType.HEAP_MEMORY);
      Iterator<ResourceListener> it = listeners.iterator();
      while (it.hasNext()) {
        ResourceListener<MemoryEvent> l = it.next();
        if (l instanceof TestMemoryThresholdListener) {
          ((TestMemoryThresholdListener) l).resetThresholdCalls();
        }
      }
      irm.setCriticalHeapPercentage(0f);
      irm.setEvictionHeapPercentage(0f);
      irm.getHeapMonitor().setTestMaxMemoryBytes(0);
      HeapMemoryMonitor.setTestDisableMemoryUpdates(false);
      return null;
    }
  };

  private SerializableCallable resetQueryMonitor = new SerializableCallable() {
    @Override
    public Object call() throws Exception {
      InternalCache cache = getCache();
      if (cache.getQueryMonitor() != null) {
        cache.getQueryMonitor().setLowMemory(false, 0);
      }
      DefaultQuery.testHook = null;
      return null;
    }
  };

  @Test
  public void testRMAndNoTimeoutSet() throws Exception {
    doCriticalMemoryHitTest("portfolios", false, 85/* crit threshold */, false, -1, true);
  }

  @Test
  public void testRMAndNoTimeoutSetParReg() throws Exception {
    doCriticalMemoryHitTest("portfolios", true, 85/* crit threshold */, false, -1, true);
  }

  @Test
  public void testRMButDisabledQueryMonitorForLowMemAndNoTimeoutSet() throws Exception {
    // verify that timeout is not set and that a query can execute properly
    doCriticalMemoryHitTest("portfolios", false, 85/* crit threshold */, true, -1, true);
  }

  @Test
  public void testRMAndTimeoutSet() throws Exception {
    // verify that we still receive critical heap cancelation
    doCriticalMemoryHitTest("portfolios", false, 85/* crit threshold */, true, TEST_QUERY_TIMEOUT,
        true);
  }

  @Test
  public void testRMAndTimeoutSetAndQueryTimesoutInstead() throws Exception {
    // verify that timeout is set correctly and cancel query
    doCriticalMemoryHitTest("portfolios", false, 85/* crit threshold */, true, TEST_QUERY_TIMEOUT,
        false);
  }

  @Test
  public void testRMButDisabledQueryMonitorForLowMemAndTimeoutSet() throws Exception {
    // verify that timeout is still working properly
    doCriticalMemoryHitTest("portfolios", false, 85/* crit threshold */, true, TEST_QUERY_TIMEOUT,
        true);
  }

  // Query directly on member with RM and QM set
  @Test
  public void testRMAndNoTimeoutSetOnServer() throws Exception {
    doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, false, -1, true);
  }

  // Query directly on member with RM and QM set
  @Test
  public void whenTimeoutIsSetAndAQueryIsExecutedThenTimeoutMustStopTheQueryBeforeCriticalMemory()
      throws Exception {
    // Timeout is set along with critical heap but it be called after the timeout expires
    // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
    // 200 entries from the region successfully, hence a timeout is expected.
    executeQueryWithTimeoutSetAndCriticalThreshold("portfolios", false, 85/* crit threshold */,
        false, 1, true);
  }

  @Test
  public void whenTimeoutIsSetAndAQueryIsExecutedFromClientThenTimeoutMustStopTheQueryBeforeCriticalMemory()
      throws Exception {
    // Timeout is set along with critical heap but it be called after the timeout expires
    // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
    // 200 entries from the region successfully, hence a timeout is expected.
    executeQueryFromClientWithTimeoutSetAndCriticalThreshold("portfolios", false,
        85/* crit threshold */, false, 1, true);
  }

  @Test
  public void testRMAndNoTimeoutSetParRegOnServer() throws Exception {
    doCriticalMemoryHitTestOnServer("portfolios", true, 85/* crit threshold */, false, -1, true);
  }

  @Test
  public void testRMButDisabledQueryMonitorForLowMemAndNoTimeoutSetOnServer() throws Exception {
    // verify that timeout is not set and that a query can execute properly
    doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, true, -1, true);
  }

  @Test
  public void testRMAndTimeoutSetOnServer() throws Exception {
    // verify that we still receive critical heap cancelation
    doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, true,
        TEST_QUERY_TIMEOUT, true);
  }

  @Test
  public void testRMAndTimeoutSetAndQueryTimesoutInsteadOnServer() throws Exception {
    // verify that timeout is set correctly and cancel query
    doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, true,
        TEST_QUERY_TIMEOUT, false);
  }

  @Test
  public void testRMButDisabledQueryMonitorForLowMemAndTimeoutSetOnServer() throws Exception {
    // verify that timeout is still working properly
    doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, true,
        TEST_QUERY_TIMEOUT, true);
  }

  @Test
  public void testPRGatherCancellation() throws Exception {
    doCriticalMemoryHitTestWithMultipleServers("portfolios", true, 85/* crit threshold */, false,
        -1, true);
  }

  @Test
  public void testPRGatherCancellationWhileGatheringResults() throws Exception {
    doCriticalMemoryHitDuringGatherTestWithMultipleServers("portfolios", true,
        85/* crit threshold */, false, -1, true);
  }

  @Test
  public void testPRGatherCancellationWhileAddingResults() throws Exception {
    doCriticalMemoryHitAddResultsTestWithMultipleServers("portfolios", true, 85/* crit threshold */,
        false, -1, true);
  }

  @Test
  public void testIndexCreationCancellationPR() throws Exception {
    doCriticalMemoryHitWithIndexTest("portfolios", true, 85/* crit threshold */, false, -1, true,
        "compact");
  }

  @Test
  public void testIndexCreationCancellation() throws Exception {
    doCriticalMemoryHitWithIndexTest("portfolios", false, 85/* crit threshold */, false, -1, true,
        "compact");
  }

  @Test
  public void testIndexCreationNoCancellationPR() throws Exception {
    doCriticalMemoryHitWithIndexTest("portfolios", true, 85/* crit threshold */, true, -1, true,
        "compact");
  }

  @Test
  public void testHashIndexCreationCancellationPR() throws Exception {
    doCriticalMemoryHitWithIndexTest("portfolios", true, 85/* crit threshold */, false, -1, true,
        "hash");
  }

  @Test
  public void testHashIndexCreationCancellation() throws Exception {
    // need to add hook to canceled result set and very it is triggered for multiple servers
    doCriticalMemoryHitWithIndexTest("portfolios", false, 85/* crit threshold */, false, -1, true,
        "hash");
  }

  @Test
  public void testHashIndexCreationNoCancellationPR() throws Exception {
    // need to add hook to canceled result set and very it is triggered for multiple servers
    doCriticalMemoryHitWithIndexTest("portfolios", true, 85/* crit threshold */, true, -1, true,
        "hash");
  }

  private void doCriticalMemoryHitTest(final String regionName, boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout, final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server = VM.getVM(0);
    final VM client = VM.getVM(1);
    final int numObjects = 200;
    try {
      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
          regionName, createPR, 0);

      startClient(client, server, port, regionName);
      populateData(server, regionName, numObjects);

      doTestCriticalHeapAndQueryTimeout(server, client, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);

      // Pause for a second and then let's recover
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }

      // Check to see if query execution is ok under "normal" or "healthy" conditions
      client.invoke(new CacheSerializableRunnable("Executing query when system is 'Normal'") {
        @Override
        public void run2() {
          try {
            QueryService qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numObjects, results.size());
          } catch (QueryInvocationTargetException e) {
            assertFalse(true);
          } catch (NameResolutionException e) {
            assertFalse(true);
          } catch (TypeMismatchException e) {
            assertFalse(true);
          } catch (FunctionDomainException e) {
            assertFalse(true);
          }
        }
      });

      // Execute a critical heap event/ query timeout test again
      doTestCriticalHeapAndQueryTimeout(server, client, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);
      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }
    } finally {
      stopServer(server);
    }
  }

  // test to verify what happens during index creation if memory threshold is hit
  private void doCriticalMemoryHitWithIndexTest(final String regionName, boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold,
      final String indexType)
      throws Exception {
    // create region on the server
    final VM server1 = VM.getVM(0);
    final VM server2 = VM.getVM(2);
    final VM client = VM.getVM(1);
    final int numObjects = 200;
    try {
      final int[] port = AvailablePortHelper.getRandomAvailableTCPPorts(2);
      startCacheServer(server1, port[0], criticalThreshold, disabledQueryMonitorForLowMem,
          queryTimeout, regionName, createPR, 0);
      startCacheServer(server2, port[1], criticalThreshold, true, -1, regionName, createPR, 0);

      startClient(client, server1, port[0], regionName);
      populateData(server1, regionName, numObjects);

      createCancelDuringGatherTestHook(server1);
      server1.invoke(new SerializableCallable("create index") {
        @Override
        public Object call() {
          QueryService qs = null;
          try {
            qs = getCache().getQueryService();
            Index index = null;
            if (indexType.equals("compact")) {
              index = qs.createIndex("newIndex", "ID", "/" + regionName);
            } else if (indexType.equals("hash")) {
              index = qs.createHashIndex("newIndex", "ID", "/" + regionName);
            }
            assertNotNull(index);
            assertTrue(((CancelDuringGatherHook) DefaultQuery.testHook).triggeredOOME);

            if (hitCriticalThreshold && !disabledQueryMonitorForLowMem) {
              throw new CacheException("Should have hit low memory") {};
            }
            assertEquals(1, qs.getIndexes().size());
          } catch (Exception e) {
            if (e instanceof IndexInvalidException) {
              if (!hitCriticalThreshold || disabledQueryMonitorForLowMem) {
                throw new CacheException("Should not have run into low memory exception") {};
              }
            } else {
              throw new CacheException(e) {};
            }
          }
          return 0;
        }
      });
    } finally {
      stopServer(server1);
      stopServer(server2);
    }
  }

  private void doCriticalMemoryHitAddResultsTestWithMultipleServers(final String regionName,
      boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server1 = VM.getVM(0);
    final VM server2 = VM.getVM(1);
    final VM client = VM.getVM(2);
    final int numObjects = 200;
    try {
      final int[] port = AvailablePortHelper.getRandomAvailableTCPPorts(2);
      startCacheServer(server1, port[0], criticalThreshold, disabledQueryMonitorForLowMem,
          queryTimeout, regionName, createPR, 0);
      startCacheServer(server2, port[1], criticalThreshold, true, -1, regionName, createPR, 0);

      startClient(client, server1, port[0], regionName);
      populateData(server2, regionName, numObjects);

      createCancelDuringAddResultsTestHook(server1);
      client.invoke(new SerializableCallable("executing query to be canceled during add results") {
        @Override
        public Object call() {
          QueryService qs = null;
          try {
            qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            if (hitCriticalThreshold && disabledQueryMonitorForLowMem == false) {
              throw new CacheException("should have hit low memory") {};
            }
          } catch (Exception e) {
            handleException(e, hitCriticalThreshold, disabledQueryMonitorForLowMem, queryTimeout);
          }
          return 0;
        }
      });

      verifyRejectedObjects(server1, disabledQueryMonitorForLowMem, queryTimeout,
          hitCriticalThreshold);
      // Pause for a second and then let's recover
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }

      // Check to see if query execution is ok under "normal" or "healthy" conditions
      client.invoke(new CacheSerializableRunnable("Executing query when system is 'Normal'") {
        @Override
        public void run2() {
          try {
            QueryService qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numObjects, results.size());
          } catch (QueryInvocationTargetException e) {
            assertFalse(true);
          } catch (NameResolutionException e) {
            assertFalse(true);
          } catch (TypeMismatchException e) {
            assertFalse(true);
          } catch (FunctionDomainException e) {
            assertFalse(true);
          }
        }
      });

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }
    } finally {
      stopServer(server1);
      stopServer(server2);
    }
  }

  // tests low memory hit while gathering partition region results
  private void doCriticalMemoryHitDuringGatherTestWithMultipleServers(final String regionName,
      boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server1 = VM.getVM(0);
    final VM server2 = VM.getVM(1);
    final VM client = VM.getVM(2);
    final int numObjects = 200;
    try {
      final int[] port = AvailablePortHelper.getRandomAvailableTCPPorts(2);
      startCacheServer(server1, port[0], criticalThreshold, disabledQueryMonitorForLowMem,
          queryTimeout, regionName, createPR, 0);
      startCacheServer(server2, port[1], criticalThreshold, true, -1, regionName, createPR, 0);

      startClient(client, server1, port[0], regionName);
      populateData(server2, regionName, numObjects);

      createCancelDuringGatherTestHook(server1);
      client.invoke(new SerializableCallable("executing query to be canceled by gather") {
        @Override
        public Object call() {
          QueryService qs = null;
          try {
            qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            query.execute();
          } catch (ServerOperationException soe) {
            if (soe.getRootCause() instanceof QueryException) {
              QueryException e = (QueryException) soe.getRootCause();
              if (!isExceptionDueToLowMemory(e, CRITICAL_HEAP_USED)) {
                throw new CacheException(soe) {};
              } else {
                return 0;
              }
            }
          } catch (Exception e) {
            throw new CacheException(e) {};
          }
          // assertTrue(((CancelDuringGatherHook)DefaultQuery.testHook).triggeredOOME);
          throw new CacheException("should have hit low memory") {};
        }
      });

      verifyRejectedObjects(server1, disabledQueryMonitorForLowMem, queryTimeout,
          hitCriticalThreshold);
      // Pause for a second and then let's recover
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }

      // Check to see if query execution is ok under "normal" or "healthy" conditions
      client.invoke(new CacheSerializableRunnable("Executing query when system is 'Normal'") {
        @Override
        public void run2() {
          try {
            QueryService qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numObjects, results.size());
          } catch (QueryInvocationTargetException e) {
            assertFalse(true);
          } catch (NameResolutionException e) {
            assertFalse(true);
          } catch (TypeMismatchException e) {
            assertFalse(true);
          } catch (FunctionDomainException e) {
            assertFalse(true);
          }
        }
      });

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }
    } finally {
      stopServer(server1);
      stopServer(server2);
    }
  }

  // Executes on client cache with multiple configured servers
  private void doCriticalMemoryHitTestWithMultipleServers(final String regionName, boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server1 = VM.getVM(0);
    final VM server2 = VM.getVM(1);
    final VM client = VM.getVM(2);
    final int numObjects = 200;

    try {
      final int[] port = AvailablePortHelper.getRandomAvailableTCPPorts(2);
      startCacheServer(server1, port[0], criticalThreshold, disabledQueryMonitorForLowMem,
          queryTimeout, regionName, createPR, 0);
      startCacheServer(server2, port[1], criticalThreshold, true, -1, regionName, createPR, 0);

      startClient(client, server1, port[0], regionName);
      populateData(server2, regionName, numObjects);

      doTestCriticalHeapAndQueryTimeout(server1, client, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);
      verifyRejectedObjects(server1, disabledQueryMonitorForLowMem, queryTimeout,
          hitCriticalThreshold);
      // Pause for a second and then let's recover
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }

      // Check to see if query execution is ok under "normal" or "healthy" conditions
      client.invoke(new CacheSerializableRunnable("Executing query when system is 'Normal'") {
        @Override
        public void run2() {
          try {
            QueryService qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numObjects, results.size());
          } catch (QueryInvocationTargetException e) {
            assertFalse(true);
          } catch (NameResolutionException e) {
            assertFalse(true);
          } catch (TypeMismatchException e) {
            assertFalse(true);
          } catch (FunctionDomainException e) {
            assertFalse(true);
          }
        }
      });

      // Execute a critical heap event/ query timeout test again
      doTestCriticalHeapAndQueryTimeout(server1, client, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);
      verifyRejectedObjects(server1, disabledQueryMonitorForLowMem, queryTimeout,
          hitCriticalThreshold);
      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server1);
      }
    } finally {
      stopServer(server1);
      stopServer(server2);
    }
  }

  // Executes the query on the server with the RM and QM configured
  private void doCriticalMemoryHitTestOnServer(final String regionName, boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server = VM.getVM(0);
    final int numObjects = 200;
    try {
      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
          regionName, createPR, 0);

      // startPeerClient(client, server, port, regionName);
      populateData(server, regionName, numObjects);

      doTestCriticalHeapAndQueryTimeout(server, server, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);

      // Pause for a second and then let's recover
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }

      // Check to see if query execution is ok under "normal" or "healthy" conditions
      server.invoke(new CacheSerializableRunnable("Executing query when system is 'Normal'") {
        @Override
        public void run2() {
          try {
            QueryService qs = getCache().getQueryService();
            Query query = qs.newQuery("Select * From /" + regionName);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numObjects, results.size());
          } catch (QueryInvocationTargetException e) {
            assertFalse(true);
          } catch (NameResolutionException e) {
            assertFalse(true);
          } catch (TypeMismatchException e) {
            assertFalse(true);
          } catch (FunctionDomainException e) {
            assertFalse(true);
          }
        }
      });

      // Execute a critical heap event/ query timeout test again
      doTestCriticalHeapAndQueryTimeout(server, server, regionName, disabledQueryMonitorForLowMem,
          queryTimeout, hitCriticalThreshold);

      // Recover from critical heap
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }
    } finally {
      stopServer(server);
    }
  }


  private void executeQueryFromClientWithTimeoutSetAndCriticalThreshold(final String regionName,
      boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server = VM.getVM(0);
    final VM client = VM.getVM(1);
    final int numObjects = 200;
    try {
      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
          regionName, createPR, 0);
      startClient(client, server, port, regionName);
      populateData(server, regionName, numObjects);
      executeQueryWithCriticalHeapCalledAfterTimeout(server, client, regionName, queryTimeout,
          hitCriticalThreshold);
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }

    } finally {
      stopServer(server);
    }
  }

  private void executeQueryWithTimeoutSetAndCriticalThreshold(final String regionName,
      boolean createPR,
      final int criticalThreshold,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold)
      throws Exception {
    // create region on the server
    final VM server = VM.getVM(0);
    final int numObjects = 200;
    try {
      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
          regionName, createPR, 0);
      populateData(server, regionName, numObjects);
      executeQueryWithCriticalHeapCalledAfterTimeout(server, server, regionName, queryTimeout,
          hitCriticalThreshold);
      if (hitCriticalThreshold) {
        vmRecoversFromCriticalHeap(server);
      }

    } finally {
      stopServer(server);
    }
  }

  // This helper method will set up a test hook
  // Execute a query on the server, pause due to the test hook
  // Execute a critical heap event
  // release the test hook
  // Check to see that the query either failed due to critical heap if query monitor is not disabled
  // or it will fail due to time out, due to the sleeps we put in
  // If timeout is disabled/not set, then the query should execute just fine
  // The last part of the test is to execute another query with the system under duress and have it
  // be rejected/cancelled if rm and qm are in use
  private void doTestCriticalHeapAndQueryTimeout(VM server, VM client, final String regionName,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold) {
    createLatchTestHook(server);

    AsyncInvocation queryExecution = invokeClientQuery(client, regionName,
        disabledQueryMonitorForLowMem, queryTimeout, hitCriticalThreshold);

    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    // We simulate a low memory/critical heap percentage hit
    if (hitCriticalThreshold) {
      vmHitsCriticalHeap(server);
    }

    // Pause until query would time out if low memory was ignored
    try {
      Thread.sleep(MAX_TEST_QUERY_TIMEOUT);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    // release the hook to have the query throw either a low memory or query timeout
    // unless otherwise configured
    releaseHook(server);

    ThreadUtils.join(queryExecution, 60000);
    // Make sure no exceptions were thrown during query testing
    try {
      assertEquals(0, queryExecution.getResult());
    } catch (Throwable e) {
      e.printStackTrace();
      fail("queryExecution.getResult() threw Exception " + e.toString());
    }
  }

  private void executeQueryWithCriticalHeapCalledAfterTimeout(VM server, VM client,
      final String regionName,
      final int queryTimeout,
      final boolean hitCriticalThreshold) {
    createLatchTestHook(server);
    AsyncInvocation queryExecution = executeQueryWithTimeout(client, regionName, queryTimeout);

    // Wait till the timeout expires on the query
    try {
      Thread.sleep(queryTimeout + 1000);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    // We simulate a low memory/critical heap percentage hit
    // But by design of this test the query must have been already terminated because of a 1ms
    // timeout
    if (hitCriticalThreshold) {
      vmHitsCriticalHeap(server);
    }

    releaseHook(server);

    ThreadUtils.join(queryExecution, 60000);
    // Make sure no exceptions were thrown during query testing
    try {
      assertEquals(0, queryExecution.getResult());
    } catch (Throwable e) {
      e.printStackTrace();
      fail("queryExecution.getResult() threw Exception " + e.toString());
    }
  }

  private AsyncInvocation executeQueryWithTimeout(VM client, final String regionName,
      final int queryTimeout) {
    return client.invokeAsync(new SerializableCallable("execute query from client") {
      @Override
      public Object call() throws CacheException {
        QueryService qs = null;
        try {
          qs = getCache().getQueryService();
          Query query = qs.newQuery("Select * From /" + regionName);
          query.execute();

        } catch (Exception e) {
          e.printStackTrace();
          if (e instanceof QueryExecutionTimeoutException) {
            logger.info("Query Execution must be terminated by a timeout.");
            return 0;
          }
          if (e instanceof ServerOperationException) {
            ServerOperationException soe = (ServerOperationException) e;
            if (soe.getRootCause() instanceof QueryException) {
              QueryException qe = (QueryException) soe.getRootCause();
              if (isExceptionDueToTimeout(qe)) {
                logger.info("Query Execution must be terminated by a timeout. Expected behavior");
                return 0;
              }
            } else if (soe.getRootCause() instanceof QueryExecutionTimeoutException) {
              logger.info("Query Execution must be terminated by a timeout.");
              return 0;
            }
          }
          e.printStackTrace();
          throw new CacheException(
              "The query should have been terminated by a timeout exception but instead hit a different exception :"
                  + e) {};
        }
        return -1;
      }
    });

  }

  private AsyncInvocation invokeClientQuery(VM client, final String regionName,
      final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout,
      final boolean hitCriticalThreshold) {
    return client.invokeAsync(new SerializableCallable("execute query from client") {
      @Override
      public Object call() throws CacheException {
        QueryService qs = null;
        try {
          qs = getCache().getQueryService();
          Query query = qs.newQuery("Select * From /" + regionName);
          query.execute();

          if (disabledQueryMonitorForLowMem) {
            if (queryTimeout != -1) {
              // we should have timed out due to the way the test is written
              // the query should have hit the configured timeouts
              throw new CacheException("Should have reached the query timeout") {};
            }
          } else {
            if (hitCriticalThreshold) {
              throw new CacheException("Exception should have been thrown due to low memory") {};
            }
          }
        } catch (Exception e) {
          handleException(e, hitCriticalThreshold, disabledQueryMonitorForLowMem, queryTimeout);
        }

        try {
          Query query = qs.newQuery("Select * From /" + regionName);
          query.execute();
          if (hitCriticalThreshold && disabledQueryMonitorForLowMem == false) {
            throw new CacheException("Low memory should still be cancelling queries") {};
          }
        } catch (Exception e) {
          handleException(e, hitCriticalThreshold, disabledQueryMonitorForLowMem, queryTimeout);
        }
        return 0;
      }
    });

  }

  private void handleException(Exception e, boolean hitCriticalThreshold,
      boolean disabledQueryMonitorForLowMem, long queryTimeout)
      throws CacheException {
    if (e instanceof QueryExecutionLowMemoryException) {
      if (!(hitCriticalThreshold && disabledQueryMonitorForLowMem == false)) {
        // meaning the query should not be canceled due to low memory
        throw new CacheException("Query should not have been canceled due to memory") {};
      }
    } else if (e instanceof QueryExecutionTimeoutException) {
      // if we have a queryTimeout set
      if (queryTimeout == -1) {
        // no time out set, this should not be thrown
        throw new CacheException(
            "Query failed due to unexplained reason, should not have been a time out or low memory "
                + DefaultQuery.testHook.getClass().getName() + " " + e) {};
      }
    } else if (e instanceof QueryException) {
      if (isExceptionDueToLowMemory((QueryException) e, CRITICAL_HEAP_USED)) {
        if (!(hitCriticalThreshold && disabledQueryMonitorForLowMem == false)) {
          // meaning the query should not be canceled due to low memory
          throw new CacheException("Query should not have been canceled due to memory") {};
        }
      } else if (isExceptionDueToTimeout((QueryException) e)) {

        if (queryTimeout == -1) {
          // no time out set, this should not be thrown
          throw new CacheException(
              "Query failed due to unexplained reason, should not have been a time out or low memory") {};
        }
      } else {
        throw new CacheException(e) {};
      }
    } else if (e instanceof ServerOperationException) {
      ServerOperationException soe = (ServerOperationException) e;
      if (soe.getRootCause() instanceof QueryExecutionLowMemoryException) {
        if (!(hitCriticalThreshold && disabledQueryMonitorForLowMem == false)) {
          // meaning the query should not be canceled due to low memory
          throw new CacheException("Query should not have been canceled due to memory") {};
        }
      } else if (soe.getRootCause() instanceof QueryException) {
        QueryException qe = (QueryException) soe.getRootCause();
        if (isExceptionDueToLowMemory(qe, CRITICAL_HEAP_USED)) {
          if (!(hitCriticalThreshold && disabledQueryMonitorForLowMem == false)) {
            // meaning the query should not be canceled due to low memory
            throw new CacheException("Query should not have been canceled due to memory") {};
          }
        } else if (isExceptionDueToTimeout(qe)) {
          if (queryTimeout == -1) {
            e.printStackTrace();
            // no time out set, this should not be thrown
            throw new CacheException(
                "Query failed due to unexplained reason, should not have been a time out or low memory") {};
          }
        } else {
          throw new CacheException(soe) {};
        }
      } else if (soe.getRootCause() instanceof QueryExecutionTimeoutException) {
        // if we have a queryTimeout set
        if (queryTimeout == -1) {
          // no time out set, this should not be thrown
          throw new CacheException(
              "Query failed due to unexplained reason, should not have been a time out or low memory "
                  + DefaultQuery.testHook.getClass().getName() + " " + soe.getRootCause()) {};
        }
      } else {
        throw new CacheException(soe) {};
      }
    } else {
      throw new CacheException(e) {};
    }
  }


  private void vmHitsCriticalHeap(VM vm) {
    vm.invoke(new CacheSerializableRunnable("vm hits critical heap") {
      @Override
      public void run2() {
        InternalResourceManager resourceManager =
            (InternalResourceManager) getCache().getResourceManager();
        resourceManager.getHeapMonitor().updateStateAndSendEvent(CRITICAL_HEAP_USED, "test");
      }
    });
  }

  private void vmRecoversFromCriticalHeap(VM vm) {
    vm.invoke(new CacheSerializableRunnable("vm hits critical heap") {
      @Override
      public void run2() {
        InternalResourceManager resourceManager =
            (InternalResourceManager) getCache().getResourceManager();
        resourceManager.getHeapMonitor().updateStateAndSendEvent(NORMAL_HEAP_USED, "test");
      }
    });
  }

  private void createLatchTestHook(VM vm) {
    vm.invoke(new CacheSerializableRunnable("create latch test Hook") {
      @Override
      public void run2() {
        DefaultQuery.TestHook hook = getPauseHook();
        DefaultQuery.testHook = hook;
      }
    });
  }

  private void createCancelDuringGatherTestHook(VM vm) {
    vm.invoke(new CacheSerializableRunnable("create cancel during gather test Hook") {
      @Override
      public void run2() {
        DefaultQuery.TestHook hook = getCancelDuringGatherHook();
        DefaultQuery.testHook = hook;
      }
    });
  }

  private void createCancelDuringAddResultsTestHook(VM vm) {
    vm.invoke(new CacheSerializableRunnable("create cancel during gather test Hook") {
      @Override
      public void run2() {
        DefaultQuery.TestHook hook = getCancelDuringAddResultsHook();
        DefaultQuery.testHook = hook;
      }
    });
  }


  private void releaseHook(VM vm) {
    vm.invoke(new CacheSerializableRunnable("release latch Hook") {
      @Override
      public void run2() {
        PauseTestHook hook = (PauseTestHook) DefaultQuery.testHook;
        hook.countDown();
      }
    });
  }

  // Verify that PRQueryEvaluator dropped objects if low memory
  private void verifyRejectedObjects(VM vm, final boolean disabledQueryMonitorForLowMem,
      final int queryTimeout, final boolean hitCriticalThreshold) {
    vm.invoke(new CacheSerializableRunnable("verify dropped objects") {
      @Override
      public void run2() {
        if ((disabledQueryMonitorForLowMem == false && hitCriticalThreshold)) {
          if (DefaultQuery.testHook instanceof PauseTestHook) {
            PauseTestHook hook = (PauseTestHook) DefaultQuery.testHook;
            assertTrue(hook.rejectedObjects);
          } else if (DefaultQuery.testHook instanceof CancelDuringGatherHook) {
            CancelDuringGatherHook hook = (CancelDuringGatherHook) DefaultQuery.testHook;
            assertTrue(hook.rejectedObjects);
          } else if (DefaultQuery.testHook instanceof CancelDuringAddResultsHook) {
            CancelDuringAddResultsHook hook = (CancelDuringAddResultsHook) DefaultQuery.testHook;
            assertTrue(hook.rejectedObjects);
          }
        }
      }

    });
  }

  private void populateData(VM vm, final String regionName, final int numObjects) {
    vm.invoke(new CacheSerializableRunnable("populate data for " + regionName) {
      @Override
      public void run2() {
        Region region = getCache().getRegion(regionName);
        for (int i = 0; i < numObjects; i++) {
          region.put("key_" + i, new Portfolio(i));
        }
      }
    });
  }

  private void stopServer(VM server) {
    server.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.MAX_QUERY_EXECUTION_TIME = -1;
        return null;
      }
    });
  }

  private void startCacheServer(VM server, final int port, final int criticalThreshold,
      final boolean disableQueryMonitorForLowMemory,
      final int queryTimeout,
      final String regionName, final boolean createPR,
      final int prRedundancy) throws Exception {

    server.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        getSystem(getServerProperties(disableQueryMonitorForLowMemory, queryTimeout));
        if (disableQueryMonitorForLowMemory == true) {
          System.setProperty(
              DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY",
              "true");
        } else {
          System.clearProperty(
              DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
        }

        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();

        if (queryTimeout != -1) {
          cache.MAX_QUERY_EXECUTION_TIME = queryTimeout;
        } else {
          cache.MAX_QUERY_EXECUTION_TIME = -1;
        }

        if (criticalThreshold != 0) {
          InternalResourceManager resourceManager =
              (InternalResourceManager) cache.getResourceManager();
          HeapMemoryMonitor heapMonitor = resourceManager.getHeapMonitor();
          heapMonitor.setTestMaxMemoryBytes(1000);
          HeapMemoryMonitor.setTestBytesUsedForThresholdSet(NORMAL_HEAP_USED);
          resourceManager.setCriticalHeapPercentage(criticalThreshold);
        }

        AttributesFactory factory = new AttributesFactory();
        if (createPR) {
          PartitionAttributesFactory paf = new PartitionAttributesFactory();
          paf.setRedundantCopies(prRedundancy);
          paf.setTotalNumBuckets(11);
          factory.setPartitionAttributes(paf.create());
        } else {
          factory.setScope(Scope.DISTRIBUTED_ACK);
          factory.setDataPolicy(DataPolicy.REPLICATE);
        }
        Region region = createRootRegion(regionName, factory.create());
        if (createPR) {
          assertTrue(region instanceof PartitionedRegion);
        } else {
          assertTrue(region instanceof DistributedRegion);
        }
        CacheServer cacheServer = getCache().addCacheServer();
        cacheServer.setPort(port);
        cacheServer.start();

        return null;
      }
    });
  }

  private void startClient(VM client, final VM server, final int port, final String regionName) {

    client.invoke(new CacheSerializableRunnable("Start client") {
      @Override
      public void run2() throws CacheException {
        Properties props = getClientProps();
        getSystem(props);

        final ClientCacheFactory ccf = new ClientCacheFactory(props);
        ccf.addPoolServer(NetworkUtils.getServerHostName(server.getHost()), port);
        ClientCache cache = (ClientCache) getClientCache(ccf);
      }
    });
  }

  private void startPeerClient(VM client, final VM server, final int port,
      final String regionName) {

    client.invoke(new CacheSerializableRunnable("Start peer client") {
      @Override
      public void run2() throws CacheException {
        Properties props = getClientProps();
        getSystem(props);

        PoolFactory pf = PoolManager.createFactory();
        pf.addServer(NetworkUtils.getServerHostName(server.getHost()), port);
        pf.create("pool1");

        AttributesFactory af = new AttributesFactory();
        af.setScope(Scope.LOCAL);
        af.setPoolName("pool1");
        Region region = createRootRegion(regionName, af.create());

        getCache();
      }
    });
  }

  protected Properties getClientProps() {
    Properties p = new Properties();
    p.setProperty(MCAST_PORT, "0");
    p.setProperty(LOCATORS, "");
    return p;
  }

  protected Properties getServerProperties(boolean disableQueryMonitorForMemory, int queryTimeout) {
    Properties p = new Properties();
    p.setProperty(LOCATORS, "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
    return p;
  }

  private boolean isExceptionDueToLowMemory(QueryException e, int HEAP_USED) {
    String message = e.getMessage();
    return (message.contains(
        String.format(
            "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
            HEAP_USED))
        || message.contains(
            "Query execution canceled due to low memory while gathering results from partitioned regions"));
  }

  private boolean isExceptionDueToTimeout(QueryException e) {
    String message = e.getMessage();
    // -1 needs to be matched due to client/server set up, BaseCommand uses the
    // MAX_QUERY_EXECUTION_TIME and not the testMaxQueryExecutionTime
    return (message.contains("The QueryMonitor thread may be sleeping longer than")
        || message.contains("Query execution canceled after exceeding max execution time")
        || message.contains(
            String.format("Query execution canceled after exceeding max execution time %sms.",
                -1)));
  }

  private DefaultQuery.TestHook getPauseHook() {
    return new PauseTestHook();
  }

  private DefaultQuery.TestHook getCancelDuringGatherHook() {
    return new CancelDuringGatherHook();
  }

  private DefaultQuery.TestHook getCancelDuringAddResultsHook() {
    return new CancelDuringAddResultsHook();
  }

  private static class PauseTestHook implements DefaultQuery.TestHook {
    private CountDownLatch latch = new CountDownLatch(1);
    public boolean rejectedObjects = false;

    @Override
    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
        final ExecutionContext executionContext) {
      switch (spot) {
        case BEFORE_QUERY_EXECUTION:
          try {
            if (!latch.await(8, TimeUnit.SECONDS)) {
              fail("query was never unlatched");
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
          }
          break;
        case LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION:
          rejectedObjects = true;
          break;
      }
    }

    public void countDown() {
      latch.countDown();
    }
  }

  // non-static class because it needs to call getCache()
  private class CancelDuringGatherHook implements DefaultQuery.TestHook {
    public boolean rejectedObjects = false;
    public boolean triggeredOOME = false;
    private int count = 0;
    private int numObjectsBeforeCancel = 5;

    @Override
    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
        final ExecutionContext executionContext) {
      switch (spot) {
        case LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION:
          rejectedObjects = true;
          break;
        case BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION:
          if (count++ == numObjectsBeforeCancel) {
            InternalResourceManager resourceManager =
                (InternalResourceManager) getCache().getResourceManager();
            resourceManager.getHeapMonitor().updateStateAndSendEvent(CRITICAL_HEAP_USED, "test");
            triggeredOOME = true;
          }
          break;
      }
    }
  }

  // non-static class because it needs to call getCache()
  private class CancelDuringAddResultsHook implements DefaultQuery.TestHook {
    public boolean triggeredOOME = false;
    public boolean rejectedObjects = false;

    @Override
    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
        final ExecutionContext executionContext) {
      switch (spot) {
        case BEFORE_BUILD_CUMULATIVE_RESULT:
          if (triggeredOOME == false) {
            InternalResourceManager resourceManager =
                (InternalResourceManager) getCache().getResourceManager();
            resourceManager.getHeapMonitor().updateStateAndSendEvent(CRITICAL_HEAP_USED, "test");
            triggeredOOME = true;
            try {
              Thread.sleep(1000);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
            }
          }
          break;
        case BEFORE_THROW_QUERY_CANCELED_EXCEPTION:
          rejectedObjects = true;
          break;
      }
    }
  }
}
