/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.cq.dunit;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.logging.log4j.Logger;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqAttributesMutator;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.CertifiableTestCacheListener;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.DistributedTombstoneOperation;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;

/**
 * This class tests the ContinuousQuery mechanism in GemFire. It does so by creating a cache server
 * with a cache and a pre-defined region and a data loader. The client creates the same region and
 * attaches the connection pool.
 */
@Category({ClientSubscriptionTest.class})
@SuppressWarnings({"serial", "Convert2MethodRef"})
public class CqQueryDUnitTest extends JUnit4CacheTestCase {
  private static final Logger logger = LogService.getLogger();
  /**
   * The port on which the cache server was started in this VM
   */
  private static int bridgeServerPort;

  private static final int port = 0;
  protected static int port2 = 0;

  public static final int noTest = -1;

  public final String[] regions = new String[] {"regionA", "regionB"};

  private static final int CREATE = 0;
  private static final int UPDATE = 1;
  private static final int DESTROY = 2;
  private static final int INVALIDATE = 3;
  private static final int CLOSE = 4;
  private static final int REGION_CLEAR = 5;
  private static final int REGION_INVALIDATE = 6;

  public static final String KEY = "key-";

  private static final String WAIT_PROPERTY = "CqQueryTest.maxWaitTime";

  private static final int WAIT_DEFAULT = (20 * 1000);

  private static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT);

  public final String[] cqs = new String[] {
      // 0 - Test for ">"
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",

      // 1 - Test for "=" and "and".
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and p.status='active'",

      // 2 - Test for "<" and "and".
      "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active'",

      // FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING LOGIC WITHIN CQ.
      // 3
      "SELECT * FROM /root/" + regions[0] + " ;",
      // 4
      "SELECT ALL * FROM /root/" + regions[0],
      // 5
      "import org.apache.geode.cache.\"query\".data.Portfolio; " + "SELECT ALL * FROM /root/"
          + regions[0] + " TYPE Portfolio",
      // 6
      "import org.apache.geode.cache.\"query\".data.Portfolio; " + "SELECT ALL * FROM /root/"
          + regions[0] + " p TYPE Portfolio",
      // 7
      "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active';",
      // 8
      "SELECT ALL * FROM /root/" + regions[0] + "  ;",
      // 9
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.description = NULL",
      // 10
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'",
      // 11 test for region 0
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",
      // 12 test for region 1
      "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID > 0",};

  public final String[] prCqs = new String[] {
      // 0 - Test for ">"
      "SELECT ALL * FROM /" + regions[0] + " p where p.ID > 0",

      // 1 - Test for "=" and "and".
      "SELECT ALL * FROM /" + regions[0] + " p where p.ID = 2 and p.status='active'"};

  private final String[] invalidCQs = new String[] {
      // Test for ">"
      "SELECT ALL * FROM /root/invalidRegion p where p.ID > 0"};


  private final String[] shortTypeCQs = new String[] {
      // 11 - Test for "short" number type
      "SELECT ALL * FROM /root/" + regions[0] + " p where p.shortID IN SET(1,2,3,4,5)"};

  @Override
  public final void postSetUp() throws Exception {
    // avoid IllegalStateException from HandShake by connecting all vms tor
    // system before creating connection pools
    getSystem();
    Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
      @Override
      public void run() {
        getSystem();
      }
    });
    postSetUpCqQueryDUnitTest();
  }

  protected void postSetUpCqQueryDUnitTest() throws Exception {}

  /* Returns Cache Server Port */
  public static int getCacheServerPort() {
    return bridgeServerPort;
  }

  /* Create Cache Server */
  public void createServer(VM server) {
    createServer(server, 0);
  }

  public void createServer(VM server, final int p) {
    createServer(server, p, false);
  }

  public void createServer(VM server, final int thePort, final boolean eviction) {
    createServer(server, thePort, eviction, DataPolicy.REPLICATE);
  }

  public void createServer(VM server, final int thePort, final boolean eviction,
      final DataPolicy dataPolicy) {
    server.invoke(() -> {
      logger.info("### Create Cache Server. ###");
      AttributesFactory factory = new AttributesFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);
      factory.setDataPolicy(dataPolicy);

      // setting the eviction attributes.
      if (eviction) {
        EvictionAttributes evictAttrs =
            EvictionAttributes.createLRUEntryAttributes(100000, EvictionAction.OVERFLOW_TO_DISK);
        factory.setEvictionAttributes(evictAttrs);
      }

      for (String region : regions) {
        createRegion(region, factory.createRegionAttributes());
        if (getRootRegion("root").getSubregion(region).isEmpty()) {
          logger.info("### CreateServer: Region is empty ###");
        }
      }

      startBridgeServer(thePort, true);
    });
  }

  public void createServerOnly(VM server, final int thePort) {
    server.invoke(() -> {
      logger.info("### Create Cache Server. ###");
      startBridgeServer(thePort, true);
    });
  }

  public void createPartitionRegion(final VM server, final String[] regionNames) {
    SerializableRunnable createRegion = new CacheSerializableRunnable("Create Region") {

      @Override
      public void run2() throws CacheException {
        RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION);
        for (String regionName : regionNames) {
          rf.create(regionName);
        }
      }
    };

    server.invoke(createRegion);
  }

  private void createReplicateRegionWithLocalDestroy(final VM server, final String[] regionNames) {
    SerializableRunnable createRegion = new CacheSerializableRunnable("Create Region") {

      @Override
      public void run2() throws CacheException {
        RegionFactory rf =
            getCache().createRegionFactory(RegionShortcut.REPLICATE).setEvictionAttributes(
                EvictionAttributes.createLIFOEntryAttributes(10, EvictionAction.LOCAL_DESTROY));
        for (String regionName : regionNames) {
          rf.create(regionName);
        }
      }
    };

    server.invoke(createRegion);
  }


  /* Close Cache Server */
  public void closeServer(VM server) {
    server.invoke(new SerializableRunnable("Close CacheServer") {
      @Override
      public void run() {
        logger.info("### Close CacheServer. ###");
        stopBridgeServer(getCache());
      }
    });

  }

  private void crashServer(VM server) {
    server.invoke(new SerializableRunnable("Crash CacheServer") {
      @Override
      public void run() {
        org.apache.geode.cache.client.internal.ConnectionImpl.setTEST_DURABLE_CLIENT_CRASH(true);
        logger.info("### Crashing CacheServer. ###");
        stopBridgeServer(getCache());
      }
    });
    Wait.pause(2 * 1000);
  }

  private void closeCrashServer(VM server) {
    server.invoke(new SerializableRunnable("Close CacheServer") {
      @Override
      public void run() {
        org.apache.geode.cache.client.internal.ConnectionImpl.setTEST_DURABLE_CLIENT_CRASH(false);
        logger.info("### Crashing CacheServer. ###");
        stopBridgeServer(getCache());
      }
    });
    Wait.pause(2 * 1000);
  }

  /* Create Client */
  public void createClient(VM client, final int serverPort, final String serverHost) {
    int[] serverPorts = new int[] {serverPort};
    createClient(client, serverPorts, serverHost, null);
  }

  /* Create Client */
  public void createClient(VM client, final int[] serverPorts, final String serverHost,
      final String redundancyLevel) {
    client.invoke(() -> {
      logger.info("### Create Client. ###");

      // Initialize CQ Service.
      getCache().getQueryService();

      AttributesFactory regionFactory = new AttributesFactory();
      regionFactory.setScope(Scope.LOCAL);
      if (redundancyLevel != null) {
        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
            Integer.parseInt(redundancyLevel), -1, null);
      } else {
        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
            -1, -1, null);
      }
      for (String region : regions) {
        createRegion(region, regionFactory.createRegionAttributes());
        logger.info("### Successfully Created Region on Client :" + region);
      }
    });
  }

  /* Create Local Region */
  public void createLocalRegion(VM client, final int[] serverPorts, final String serverHost,
      final String redundancyLevel, final String[] regionNames) {
    SerializableRunnable createQService = new CacheSerializableRunnable("Create Local Region") {
      @Override
      public void run2() throws CacheException {
        logger.info("### Create Local Region. ###");
        AttributesFactory af = new AttributesFactory();
        af.setScope(Scope.LOCAL);

        if (redundancyLevel != null) {
          ClientServerTestCase.configureConnectionPool(af, serverHost, serverPorts, true,
              Integer.parseInt(redundancyLevel), -1, null);
        } else {
          ClientServerTestCase.configureConnectionPool(af, serverHost, serverPorts, true, -1, -1,
              null);
        }

        RegionFactory rf = getCache().createRegionFactory(af.create());
        for (int i = 0; i < regionNames.length; i++) {
          rf.create(regionNames[i]);
          logger.info("### Successfully Created Region on Client :" + regions[i]);
        }
      }
    };

    client.invoke(createQService);
  }

  public void createClientWith2Pools(VM client, final int[] serverPorts1, final int[] serverPorts2,
      final String serverHost, final String redundancyLevel) {
    client.invoke(() -> {
      logger.info("### Create Client. ###");

      // Initialize CQ Service.
      getCache().getQueryService();

      AttributesFactory regionFactory0 = new AttributesFactory();
      AttributesFactory regionFactory1 = new AttributesFactory();
      regionFactory0.setScope(Scope.LOCAL);
      regionFactory1.setScope(Scope.LOCAL);
      if (redundancyLevel != null) {
        ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
            serverPorts1, true,
            Integer.parseInt(redundancyLevel), -1,
            null, "testPoolA");
        ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
            serverPorts2, true,
            Integer.parseInt(redundancyLevel), -1,
            null, "testPoolB");
      } else {
        ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
            serverPorts1, true, -1, -1, null,
            "testPoolA");
        ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
            serverPorts2, true, -1, -1, null,
            "testPoolB");
      }
      createRegion(regions[0], regionFactory0.createRegionAttributes());
      createRegion(regions[1], regionFactory1.createRegionAttributes());
      logger.info("### Successfully Created Region on Client :" + regions[0]);
      logger.info("### Successfully Created Region on Client :" + regions[1]);

    });
  }


  /* Close Client */
  public void closeClient(VM client) {
    SerializableRunnable closeCQService = new CacheSerializableRunnable("Close Client") {
      @Override
      public void run2() throws CacheException {
        logger.info("### Close Client. ###");
        try {
          ((DefaultQueryService) getCache().getQueryService()).closeCqService();
        } catch (Exception ex) {
          logger.info("### Failed to get CqService during ClientClose() ###");
        }

      }
    };

    client.invoke(closeCQService);
    Wait.pause(2 * 1000);
  }


  /* Create/Init values */
  public void createValues(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          region1.put(KEY + i, new Portfolio(i));
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  /* Create/Init values */
  public void createValuesWithTime(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          Portfolio portfolio = new Portfolio(i);
          portfolio.createTime = System.currentTimeMillis();
          region1.put(KEY + i, portfolio);
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  private void createValuesWithShort(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          Portfolio portfolio = new Portfolio(i);
          portfolio.shortID = new Short("" + i);
          region1.put(KEY + i, portfolio);
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  private void createValuesAsPrimitives(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          switch (i % 5) {
            case 0:
              region1.put("key" + i, "seeded");
              break;
            case 1:
              region1.put("key" + i, "seeding");
              break;
            case 2:
              region1.put("key" + i, (double) i);
              break;
            case 3:
              region1.put("key" + i, i);
              break;
            case 4:
              region1.put("key" + i, new Portfolio(i));
              break;
            default:
              region1.put("key" + i, i);
              break;

          }
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  public void updateValuesAsPrimitives(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          switch (i % 5) {
            case 0:
              region1.put("key" + i, "seeding");
              break;
            case 1:
              region1.put("key" + i, "seeded");
              break;
            case 2:
              region1.put("key" + i, i);
              break;
            case 3:
              region1.put("key" + i, new Portfolio(i));
              break;
            case 4:
              region1.put("key" + i, (double) i);
              break;
            default:
              region1.put("key" + i, i);
              break;

          }
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  public void createValuesAsPortfolios(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          region1.put("key" + i, new Portfolio(i));
        }
        logger.info("### Number of Entries in Region :" + region1.keySet().size());
      }
    });
  }

  private void createIndex(VM vm, final String indexName, final String indexedExpression,
      final String regionPath) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        try {
          QueryService qs = getCache().getQueryService();
          qs.createIndex(indexName, indexedExpression, regionPath);
        } catch (Exception e) {
          throw new CacheException(e) {};
        }
      }
    });
  }


  /* delete values */
  public void deleteValues(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Delete values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          region1.destroy(KEY + i);
        }
        logger.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
      }

    });
  }

  /**
   * support for invalidating values.
   */
  public void invalidateValues(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          region1.invalidate(KEY + i);
        }
        logger.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
      }

    });
  }

  /* Register CQs */
  public void createCQ(VM vm, final String cqName, final String queryStr) {
    createCQ(vm, cqName, queryStr, false);
  }

  public void createCQ(VM vm, final String cqName, final String queryStr,
      final boolean isBridgeMemberTest) {
    vm.invoke(() -> {

      logger.info("### Create CQ. ###" + cqName);
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      // Create CQ Attributes.
      CqAttributesFactory cqf = new CqAttributesFactory();
      CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};

      cqf.initCqListeners(cqListeners);
      CqAttributes cqa = cqf.create();

      // Create CQ.
      CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();

    });
  }

  /* Register CQs with no name, execute, and close */
  public void createAndExecCQNoName(VM vm, final String queryStr) {
    vm.invoke(() -> {
      logger.info("### DEBUG CREATE CQ START ####");
      logger.info("### Create CQ with no name. ###");
      String cqName = null;
      QueryService cqService = getCache().getQueryService();

      SelectResults cqResults;
      for (int i = 0; i < 20; ++i) {
        // Create CQ Attributes.
        CqAttributesFactory cqf = new CqAttributesFactory();
        CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};

        cqf.initCqListeners(cqListeners);
        CqAttributes cqa = cqf.create();

        // Create CQ with no name and execute with initial results.

        CqQuery cq1 = cqService.newCq(queryStr, cqa);
        ((CqQueryTestListener) cqListeners[0]).cqName = cq1.getName();

        if (cq1 == null) {
          logger.info("Failed to get CqQuery object for CQ with no name.");
        } else {
          cqName = cq1.getName();
          logger.info("Created CQ with no name, generated CQ name: "
              + cqName + " CQ state:" + cq1.getState());
          assertThat(cq1.getState().isStopped()).describedAs("Create CQ with no name illegal state")
              .isTrue();
        }
        if (i % 2 == 0) {
          cqResults = cq1.executeWithInitialResults();

          logger.info("initial result size = " + cqResults.size());
          logger.info("CQ state after execute with initial results = " + cq1.getState());
          assertThat(cq1.getState().isRunning())
              .describedAs("executeWithInitialResults() state mismatch").isTrue();
        } else {
          try {
            cq1.execute();
          } catch (Exception ex) {
            logger.info("CQService is :" + cqService);
            ex.printStackTrace();
            fail("Failed to execute CQ " + cqName + " . " + ex.getMessage());
          }
          logger.info("CQ state after execute = " + cq1.getState());
          assertThat(cq1.getState().isRunning()).describedAs("execute() state mismatch").isTrue();
        }

        // Close the CQ
        try {
          cq1.close();
        } catch (Exception ex) {
          logger.info("CqService is :" + cqService, ex);
          fail("Failed to close CQ " + cqName + " . " + ex.getMessage());
        }
        assertThat(cq1.getState().isClosed()).describedAs("closeCq() state mismatch").isTrue();
      }
    });
  }

  public void executeCQ(VM vm, final String cqName, final boolean initialResults,
      String expectedErr) {
    executeCQ(vm, cqName, initialResults, noTest, expectedErr);
  }

  /**
   * Execute/register CQ as running.
   *
   * @param initialResults true if initialResults are requested
   * @param expectedResultsSize if >= 0, validate results against this size
   * @param expectedErr if not null, an error we expect
   */
  private void executeCQ(VM vm, final String cqName, final boolean initialResults,
      final int expectedResultsSize, final String expectedErr) {
    vm.invoke(() -> {
      if (expectedErr != null) {
        getCache().getLogger()
            .info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
      }
      try {
        QueryService cqService = getCache().getQueryService();
        CqQuery cq1 = cqService.getCq(cqName);
        assertThat(cq1).isNotNull();
        assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();

        if (initialResults) {
          SelectResults cqResults;
          cqResults = cq1.executeWithInitialResults();
          assertThat(cq1.getState().isRunning())
              .describedAs("executeWithInitialResults() state mismatch").isTrue();
          if (expectedResultsSize >= 0) {
            assertThat(cqResults.size()).describedAs("unexpected results size")
                .isEqualTo(expectedResultsSize);
          }
        } else {
          cq1.execute();
          assertThat(cq1.getState().isRunning()).describedAs("execute() state mismatch").isTrue();
        }
      } finally {
        if (expectedErr != null) {
          getCache().getLogger()
              .info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
        }
      }
    });
  }

  /* Stop/pause CQ */
  public void stopCQ(VM vm, final String cqName) {
    vm.invoke(() -> {
      logger.info("### Stop CQ. ###" + cqName);
      QueryService cqService = getCache().getQueryService();

      CqQuery cq1 = cqService.getCq(cqName);
      cq1.stop();

      assertThat(cq1.getState().isStopped()).describedAs("Stop CQ state mismatch").isTrue();
    });
  }

  // Stop and execute CQ repeatedly
  /* Stop/pause CQ */
  private void stopExecCQ(VM vm) {
    vm.invoke(() -> {
      logger.info("### Stop and Exec CQ. ###" + "testCQStopExecute_0");
      QueryService cqService = getCache().getQueryService();
      CqQuery cq1 = cqService.getCq("testCQStopExecute_0");

      for (int i = 0; i < 20; ++i) {

        cq1.stop();

        assertThat(cq1.getState().isStopped()).describedAs("Stop CQ state mismatch, count = " + i)
            .isTrue();
        logger.info("After stop in Stop and Execute loop, ran successfully, loop count: " + i);
        logger.info("CQ state: " + cq1.getState());

        cq1.execute();

        assertThat(cq1.getState().isRunning())
            .describedAs("Execute CQ state mismatch, count = " + i).isTrue();
        logger.info("After execute in Stop and Execute loop, ran successfully, loop count: " + i);
        logger.info("CQ state: " + cq1.getState());
      }
    });
  }


  /* UnRegister CQs */
  public void closeCQ(VM vm, final String cqName) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      // Close CQ.
      CqQuery cq1 = cqService.getCq(cqName);
      cq1.close();

      assertThat(cq1.getState().isClosed()).describedAs("Close CQ state mismatch").isTrue();
    });
  }

  /* Register CQs */
  private void registerInterestListCQ(VM vm, final String regionName) {
    vm.invoke(() -> {
      Region region = getRootRegion().getSubregion(regionName);
      region.getAttributesMutator()
          .addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));

      List list = new ArrayList();
      for (int i = 1; i <= 10; i++) {
        list.add(KEY + i);
      }
      region.registerInterest(list);
    });
  }

  // helps test case where executeIR is called multiple times as well as after close
  private void executeAndCloseAndExecuteIRMultipleTimes(VM vm,
      final String queryStr) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();
      CqAttributesFactory cqf = new CqAttributesFactory();
      CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};

      cqf.initCqListeners(cqListeners);
      CqAttributes cqa = cqf.create();

      CqQuery cq1 = cqService.newCq("testCQResultSet_0", queryStr, cqa);
      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();

      cq1.executeWithInitialResults();
      try {
        cq1.executeWithInitialResults();
      } catch (IllegalStateException e) {
        // expected
      }
      cq1.close();

      try {
        cq1.executeWithInitialResults();
      } catch (CqClosedException e) {
        // expected
        return;
      }
      fail("should have received cqClosedException");
    });
  }


  /* Validate CQ Count */
  public void validateCQCount(VM vm, final int cqCnt) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      int numCqs = cqService.getCqs().length;
      assertThat(numCqs).describedAs("Number of cqs mismatch.").isEqualTo(cqCnt);
    });
  }


  /**
   * Throws AssertionError if the CQ can be found or if any other error occurs
   */
  private void failIfCQExists(VM vm, final String cqName) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Unexpectedly found CqQuery for CQ : " + cqName).isNull();
    });
  }

  private void validateCQError(VM vm, final String cqName, final int numError) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();
      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener cqListener = cqAttr.getCqListener();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener;
      listener.printInfo(false);

      // Check for totalEvents count.
      if (numError != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getErrorEventCount()).describedAs("Total Event Count mismatch")
            .isEqualTo(numError);
      }
    });
  }

  public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates,
      final int updates, final int deletes) {
    validateCQ(vm, cqName, resultSize, creates, updates, deletes, noTest, noTest, noTest, noTest);
  }

  public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates,
      final int updates, final int deletes, final int queryInserts, final int queryUpdates,
      final int queryDeletes, final int totalEvents) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();
      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener cqListeners[] = cqAttr.getCqListeners();
      CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0];
      listener.printInfo(false);

      // Check for totalEvents count.
      if (totalEvents != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getTotalEventCount()).describedAs("Total Event Count mismatch")
            .isEqualTo(totalEvents);
      }

      assertThat(resultSize).describedAs("test for event counts instead of results size")
          .isEqualTo(noTest);

      // Check for create count.
      if (creates != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getCreateEventCount()).describedAs("Create Event mismatch")
            .isEqualTo(creates);
      }

      // Check for update count.
      if (updates != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getUpdateEventCount()).describedAs("Update Event mismatch")
            .isEqualTo(updates);
      }

      // Check for delete count.
      if (deletes != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getDeleteEventCount()).describedAs("Delete Event mismatch")
            .isEqualTo(deletes);
      }

      // Check for queryInsert count.
      if (queryInserts != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getQueryInsertEventCount()).describedAs("Query Insert Event mismatch")
            .isEqualTo(queryInserts);
      }

      // Check for queryUpdate count.
      if (queryUpdates != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getQueryUpdateEventCount()).describedAs("Query Update Event mismatch")
            .isEqualTo(queryUpdates);
      }

      // Check for queryDelete count.
      if (queryDeletes != noTest) {
        // Result size validation.
        listener.printInfo(true);
        assertThat(listener.getQueryDeleteEventCount()).describedAs("Query Delete Event mismatch")
            .isEqualTo(queryDeletes);
      }
    });
  }

  public void waitForCreated(VM vm, final String cqName, final String key) {
    waitForEvent(vm, 0, cqName, key);
  }

  public void waitForUpdated(VM vm, final String cqName, final String key) {
    waitForEvent(vm, 1, cqName, key);
  }

  public void waitForDestroyed(VM vm, final String cqName, final String key) {
    waitForEvent(vm, 2, cqName, key);
  }

  public void waitForInvalidated(VM vm, final String cqName, final String key) {
    waitForEvent(vm, 3, cqName, key);
  }

  public void waitForClose(VM vm, final String cqName) {
    waitForEvent(vm, 4, cqName, null);
  }

  public void waitForRegionClear(VM vm, final String cqName) {
    waitForEvent(vm, 5, cqName, null);
  }

  public void waitForRegionInvalidate(VM vm, final String cqName) {
    waitForEvent(vm, 6, cqName, null);
  }

  private void waitForError(VM vm, final String cqName, final String errorMessage) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener[] cqListener = cqAttr.getCqListeners();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
      listener.waitForError(errorMessage);
    });
  }

  protected void waitForCqsDisconnected(VM vm, final String cqName, final int count) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener[] cqListener = cqAttr.getCqListeners();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
      listener.waitForCqsDisconnectedEvents(count);
    });
  }

  protected void waitForCqsConnected(VM vm, final String cqName, final int count) {
    vm.invoke(() -> {
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener[] cqListener = cqAttr.getCqListeners();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
      listener.waitForCqsConnectedEvents(count);
    });
  }

  private void waitForEvent(VM vm, final int event, final String cqName, final String key) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener[] cqListener = cqAttr.getCqListeners();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];

      switch (event) {
        case CREATE:
          listener.waitForCreated(key);
          break;

        case UPDATE:
          listener.waitForUpdated(key);
          break;

        case DESTROY:
          listener.waitForDestroyed(key);
          break;

        case INVALIDATE:
          listener.waitForInvalidated(key);
          break;

        case CLOSE:
          listener.waitForClose();
          break;

        case REGION_CLEAR:
          listener.waitForRegionClear();
          break;

        case REGION_INVALIDATE:
          listener.waitForRegionInvalidate();
          break;

      }
    });
  }

  /**
   * Waits till the CQ state is same as the expected. Waits for max time, if the CQ state is not
   * same as expected throws exception.
   */
  public void waitForCqState(VM vm, final String cqName, final int state) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      final CqStateImpl cqState = (CqStateImpl) cQuery.getState();
      // Wait max time, till the CQ state is as expected.
      await("cqState never became " + state)
          .until(() -> cqState.getState(), Matchers.equalTo(state));
    });
  }

  public void clearCQListenerEvents(VM vm, final String cqName) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      CqAttributes cqAttr = cQuery.getCqAttributes();
      CqListener cqListener = cqAttr.getCqListener();
      CqQueryTestListener listener = (CqQueryTestListener) cqListener;
      listener.getEventHistory();
    });
  }

  private void validateQuery(VM vm, final String query, final int resultSize) {
    vm.invoke(() -> {
      QueryService cqService = getCache().getQueryService();

      Query q = cqService.newQuery(query);

      Object r = q.execute();
      if (r instanceof Collection) {
        int rSize = ((Collection) r).size();
        logger.info("### Result Size is :" + rSize);
        assertThat(rSize).isEqualTo(rSize);
      }
    });
  }

  private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) {

    Properties props = new Properties();
    StringBuilder endPoints = new StringBuilder();
    String host = hosts[0];
    for (int i = 0; i < ports.length; i++) {
      if (hosts.length > 1) {
        host = hosts[i];
      }
      endPoints.append("server").append(i).append("=").append(host).append(":").append(ports[i]);
      if (ports.length > (i + 1)) {
        endPoints.append(",");
      }
    }

    props.setProperty("endpoints", endPoints.toString());
    props.setProperty("retryAttempts", "1");

    // Add other property elements.
    if (newProps != null) {
      Enumeration e = newProps.keys();
      while (e.hasMoreElements()) {
        String key = (String) e.nextElement();
        props.setProperty(key, newProps.getProperty(key));
      }
    }
    return props;
  }


  // Exercise CQ attributes mutator functions
  private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) {
    vm.invoke(() -> {
      CqQuery cq1;
      QueryService cqService = getCache().getQueryService();

      CqQuery cQuery = cqService.getCq(cqName);
      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();

      cq1 = cqService.getCq(cqName);

      CqAttributesMutator cqAttrMutator = cq1.getCqAttributesMutator();
      CqAttributes cqAttr = cq1.getCqAttributes();
      CqListener cqListeners[];
      switch (mutator_function) {
        case CREATE:
          // Reinitialize with 2 CQ Listeners
          CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
              new CqQueryTestListener(getCache().getLogger())};
          cqAttrMutator.initCqListeners(cqListenersArray);
          cqListeners = cqAttr.getCqListeners();
          assertThat(2).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
          break;

        case UPDATE:
          // Add 2 new CQ Listeners
          CqListener newListener1 = new CqQueryTestListener(getCache().getLogger());
          CqListener newListener2 = new CqQueryTestListener(getCache().getLogger());
          cqAttrMutator.addCqListener(newListener1);
          cqAttrMutator.addCqListener(newListener2);

          cqListeners = cqAttr.getCqListeners();
          assertThat(3).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
          break;

        case DESTROY:
          cqListeners = cqAttr.getCqListeners();
          cqAttrMutator.removeCqListener(cqListeners[0]);
          cqListeners = cqAttr.getCqListeners();
          assertThat(2).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);

          // Remove a listener and validate
          cqAttrMutator.removeCqListener(cqListeners[0]);
          cqListeners = cqAttr.getCqListeners();
          assertThat(1).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
          break;
      }
    });
  }


  private void performGC(VM server, final String regionName) {
    SerializableRunnable task = new CacheSerializableRunnable("perform GC") {
      @Override
      public void run2() throws CacheException {
        Region subregion = getCache().getRegion("root/" + regionName);
        DistributedTombstoneOperation gc = DistributedTombstoneOperation
            .gc((DistributedRegion) subregion, new EventID(getCache().getDistributedSystem()));
        gc.distribute();
      }
    };
    server.invoke(task);
  }

  private void ensureCQExists(VM server, final String regionName) {
    SerializableRunnable task = new CacheSerializableRunnable("check CQs") {
      @Override
      public void run2() throws CacheException {
        CqQuery queries[] = getCache().getQueryService().getCqs();
        assertThat(queries.length > 0).describedAs("expected to find a CQ but found none").isTrue();
        System.out.println("found query " + queries[0]);
        assertThat(queries[0].getName().startsWith(
            "testCQResultSet_0")).describedAs("Couldn't find query " + "testCQResultSet_0")
                .isTrue();
        assertThat(!queries[0].isClosed()).describedAs("expected the CQ to be open: " + queries[0])
            .isTrue();
      }
    };
    server.invoke(task);
  }


  /**
   * bug #47494 - CQs were destroyed when a server did a tombstone GC
   */
  @Test
  public void testCQRemainsWhenServerGCs() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);
    VM server2 = VM.getVM(2);

    createServer(server);
    createServer(server2);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);
    try {
      /* CQ Test with initial Values. */
      int size = 5;
      createValuesWithShort(server, regions[0], size);
      Wait.pause(500);

      final String cqName = "testCQResultSet_0";

      // Create CQs.
      createCQ(client, cqName, shortTypeCQs[0]);

      // Check resultSet Size.
      executeCQ(client, cqName, true, 5, null);

      // Simulate a tombstone GC on the server
      performGC(server, regions[0]);

      // Check the CQs
      ensureCQExists(server, regions[0]);
    } finally {
      closeClient(client);
      closeServer(server);
      closeServer(server2);
    }
  }


  /**
   * Test for InterestList and CQ registered from same clients.
   */
  @Test
  public void testInterestListAndCQs() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    /* Init Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client, thePort, host0);


    /* Create CQs. */
    createCQ(client, "testInterestListAndCQs_0", cqs[0]);
    validateCQCount(client, 1);

    /* Init values at server. */
    final int size = 10;

    executeCQ(client, "testInterestListAndCQs_0", false, null);
    registerInterestListCQ(client, regions[0]);

    createValues(server, regions[0], size);
    // Wait for client to Sync.

    for (int i = 1; i <= 10; i++) {
      waitForCreated(client, "testInterestListAndCQs_0", KEY + i);
    }

    // validate CQs.
    validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
        /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Validate InterestList.
    // CREATE
    client.invoke(new CacheSerializableRunnable("validate updates") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regions[0]);
        assertThat(region).isNotNull();

        Set keys = region.entrySet();
        assertThat(keys.size())
            .describedAs(
                "Mismatch, number of keys in local region is not equal to the interest list size")
            .isEqualTo(size);

        CertifiableTestCacheListener ctl =
            (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
        for (int i = 1; i <= 10; i++) {
          ctl.waitForCreated(KEY + i);
          assertThat(region.getEntry(KEY + i)).isNotNull();
        }
      }
    });

    // UPDATE
    createValues(server, regions[0], size);
    // Wait for client to sync.
    for (int i = 1; i <= 10; i++) {
      waitForUpdated(client, "testInterestListAndCQs_0", KEY + i);
    }

    client.invoke(new CacheSerializableRunnable("validate updates") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regions[0]);
        assertThat(region).isNotNull();

        Set keys = region.entrySet();
        assertThat(keys.size())
            .describedAs(
                "Mismatch, number of keys in local region is not equal to the interest list size")
            .isEqualTo(size);

        CertifiableTestCacheListener ctl =
            (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
        for (int i = 1; i <= 10; i++) {
          ctl.waitForUpdated(KEY + i);
          assertThat(region.getEntry(KEY + i)).isNotNull();
        }
      }
    });

    // INVALIDATE
    server.invoke(new CacheSerializableRunnable("Invalidate values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        for (int i = 1; i <= size; i++) {
          region1.invalidate(KEY + i);
        }
      }
    });

    waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10);

    client.invoke(new CacheSerializableRunnable("validate invalidates") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regions[0]);
        assertThat(region).isNotNull();

        Set keys = region.entrySet();
        assertThat(keys.size())
            .describedAs(
                "Mismatch, number of keys in local region is not equal to the interest list size")
            .isEqualTo(size);

        CertifiableTestCacheListener ctl =
            (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
        for (int i = 1; i <= 10; i++) {
          ctl.waitForInvalidated(KEY + i);
          assertThat(region.getEntry(KEY + i)).isNotNull();
        }
      }
    });

    validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ size, /* deletes; */ noTest, /* queryInserts: */ size,
        /* queryUpdates: */ size, /* queryDeletes: */ size, /* totalEvents: */ size * 3);

    // DESTROY - this should not have any effect on CQ, as the events are
    // already destroyed from invalidate events.
    server.invoke(new CacheSerializableRunnable("Invalidate values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        for (int i = 1; i <= size; i++) {
          region1.destroy(KEY + i);
        }
      }
    });

    // Wait for destroyed.
    client.invoke(new CacheSerializableRunnable("validate destroys") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regions[0]);
        assertThat(region).isNotNull();

        CertifiableTestCacheListener ctl =
            (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
        for (int i = 1; i <= 10; i++) {
          ctl.waitForDestroyed(KEY + i);
        }
      }
    });

    validateCQ(client, "testInterestListAndCQs_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ size, /* deletes; */ noTest, /* queryInserts: */ size,
        /* queryUpdates: */ size, /* queryDeletes: */ size, /* totalEvents: */ size * 3);

    closeClient(client);
    closeServer(server);
  }


  /**
   * Test for CQ register and UnRegister.
   */
  @Test
  public void testCQStopExecute() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    /* Init Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client, thePort, host0);

    /* Create CQs. */
    createCQ(client, "testCQStopExecute_0", cqs[0]);
    validateCQCount(client, 1);

    executeCQ(client, "testCQStopExecute_0", false, null);

    /* Init values at server. */
    int size = 10;
    createValues(server, regions[0], size);
    // Wait for client to sync.

    waitForCreated(client, "testCQStopExecute_0", KEY + size);

    // Check if Client and Server in sync.
    // validateServerClientRegionEntries(server, client, regions[0]);
    validateQuery(server, cqs[0], 10);
    // validate CQs.
    // validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
    validateCQ(client, "testCQStopExecute_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Test CQ stop
    stopCQ(client, "testCQStopExecute_0");

    // Test CQ re-enable
    executeCQ(client, "testCQStopExecute_0", false, null);

    /* Init values at server. */
    createValues(server, regions[0], 20);
    // Wait for client to sync.
    waitForCreated(client, "testCQStopExecute_0", KEY + 20);
    size = 30;

    // Check if Client and Server in sync.
    // validateServerClientRegionEntries(server, client, regions[0]);
    validateQuery(server, cqs[0], 20);
    // validate CQs.
    // validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
    validateCQ(client, "testCQStopExecute_0", /* resultSize: */ noTest, /* creates: */ 20,
        /* updates: */ 10, /* deletes; */ 0, /* queryInserts: */ 20, /* queryUpdates: */ 10,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Stop and execute CQ 20 times
    stopExecCQ(client);

    // Test CQ Close
    closeCQ(client, "testCQStopExecute_0");

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * Test for CQ Attributes Mutator functions
   */
  @Test
  public void testCQAttributesMutator() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    /* Init Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client, thePort, host0);

    /* Create CQs. */
    String cqName = "testCQAttributesMutator_0";
    createCQ(client, cqName, cqs[0]);
    validateCQCount(client, 1);
    executeCQ(client, cqName, false, null);

    /* Init values at server. */
    int size = 10;
    createValues(server, regions[0], size);
    // Wait for client to sync.
    waitForCreated(client, cqName, KEY + size);

    // validate CQs.
    validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ size, /* updates: */ 0,
        /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0, /* queryDeletes: */ 0,
        /* totalEvents: */ size);

    // Add 2 new CQ Listeners
    mutateCQAttributes(client, cqName, UPDATE);

    /* Init values at server. */
    createValues(server, regions[0], size * 2);
    waitForCreated(client, cqName, KEY + (size * 2));

    validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 20, /* updates: */ 10,
        /* deletes; */ 0, /* queryInserts: */ 20, /* queryUpdates: */ 10, /* queryDeletes: */ 0,
        /* totalEvents: */ 30);

    // Remove 2 listeners and validate
    mutateCQAttributes(client, cqName, DESTROY);

    validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 10, /* updates: */ 10,
        /* deletes; */ 0, /* queryInserts: */ 10, /* queryUpdates: */ 10, /* queryDeletes: */ 0,
        /* totalEvents: */ 20);

    // Reinitialize with 2 CQ Listeners
    mutateCQAttributes(client, cqName, CREATE);

    /* Delete values at server. */
    deleteValues(server, regions[0], 20);
    // Wait for client to sync.
    waitForDestroyed(client, cqName, KEY + (size * 2));

    validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 0, /* updates: */ 0,
        /* deletes; */ 20, /* queryInserts: */ 0, /* queryUpdates: */ 0, /* queryDeletes: */ 20,
        /* totalEvents: */ 20);

    // Close CQ
    closeCQ(client, cqName);

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * Test for CQ register and UnRegister.
   */
  @Test
  public void testCQCreateClose() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    /* Init Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client, thePort, host0);

    /* Create CQs. */
    createCQ(client, "testCQCreateClose_0", cqs[0]);
    validateCQCount(client, 1);

    executeCQ(client, "testCQCreateClose_0", false, null);

    /* Init values at server. */
    int size = 10;
    createValues(server, regions[0], size);
    // Wait for client to sync.
    waitForCreated(client, "testCQCreateClose_0", KEY + size);

    // Check if Client and Server in sync.
    validateQuery(server, cqs[0], 10);
    // validate CQs.
    validateCQ(client, "testCQCreateClose_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Test CQ stop
    stopCQ(client, "testCQCreateClose_0");

    // Test CQ re-enable
    executeCQ(client, "testCQCreateClose_0", false, null);

    // Test CQ Close
    closeCQ(client, "testCQCreateClose_0");

    // Create CQs with no name, execute, and close.
    createAndExecCQNoName(client, cqs[0]);

    // Accessing the closed CQ.
    failIfCQExists(client, "testCQCreateClose_0");

    // re-Create the cq which is closed.
    createCQ(client, "testCQCreateClose_0", cqs[0]);

    /* Test CQ Count */
    validateCQCount(client, 1);

    // Registering CQ with same name from same client.
    try {
      createCQ(client, "testCQCreateClose_0", cqs[0]);
      fail("Trying to create CQ with same name. Should have thrown CQExistsException");
    } catch (org.apache.geode.test.dunit.RMIException rmiExc) {

      Throwable cause = rmiExc.getCause(); // should be a CQExistsException
      assertThat(cause).describedAs("Got wrong exception: " + cause.getClass().getName())
          .isInstanceOf(CqExistsException.class);
    }

    // Getting values from non-existent CQ.
    failIfCQExists(client, "testCQCreateClose_NO");

    // Server Registering CQ.
    try {
      createCQ(server, "testCQCreateClose_1", cqs[0]);
      fail("Trying to create CQ on Cache Server. Should have thrown Exception.");
    } catch (org.apache.geode.test.dunit.RMIException rmiExc) {

      Throwable cause = rmiExc.getCause(); // should be a IllegalStateException
      assertThat(cause).describedAs("Got wrong exception: " + cause.getClass().getName())
          .isInstanceOf(IllegalStateException.class);
    }

    // Trying to execute CQ on non-existing region.
    createCQ(client, "testCQCreateClose_2", invalidCQs[0]);
    try {
      executeCQ(client, "testCQCreateClose_2", false, "RegionNotFoundException");
      fail("Trying to create CQ on non-existing Region. Should have thrown Exception.");
    } catch (org.apache.geode.test.dunit.RMIException rmiExc) {

      Throwable cause = rmiExc.getCause(); // should be a RegionNotFoundException
      assertThat(cause).describedAs("Expected cause to be RegionNotFoundException")
          .isInstanceOf(RegionNotFoundException.class);
    }

    /* Test CQ Count - Above failed create should not increment the CQ cnt. */
    validateCQCount(client, 2);

    createCQ(client, "testCQCreateClose_3", cqs[2]);

    validateCQCount(client, 3);

    /* Test for closeAllCQs() */

    client.invoke(() -> {
      logger.info("### Close All CQ. ###");
      QueryService cqService = getCache().getQueryService();
      cqService.closeCqs();
    });

    validateCQCount(client, 0);

    // Initialize.
    createCQ(client, "testCQCreateClose_2", cqs[1]);
    createCQ(client, "testCQCreateClose_4", cqs[1]);
    createCQ(client, "testCQCreateClose_5", cqs[1]);

    // Execute few of the initialized cqs
    executeCQ(client, "testCQCreateClose_4", false, null);
    executeCQ(client, "testCQCreateClose_5", false, null);

    // Call close all CQ.
    client.invoke(() -> {
      logger.info("### Close All CQ 2. ###");
      // Get CQ Service.
      QueryService cqService = getCache().getQueryService();
      cqService.closeCqs();
    });

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * This will test the events after region destroy. The CQs on the destroy region needs to be
   * closed.
   */
  @Test
  public void testRegionDestroy() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    /* Init Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client, thePort, host0);


    /* Create CQs. */
    createCQ(client, "testRegionDestroy_0", cqs[0]);
    createCQ(client, "testRegionDestroy_1", cqs[0]);
    createCQ(client, "testRegionDestroy_2", cqs[0]);

    executeCQ(client, "testRegionDestroy_0", false, null);
    executeCQ(client, "testRegionDestroy_1", false, null);
    executeCQ(client, "testRegionDestroy_2", false, null);

    /* Init values at server. */
    final int size = 10;
    registerInterestListCQ(client, regions[0]);
    createValues(server, regions[0], size);

    // Wait for client to sync.

    waitForCreated(client, "testRegionDestroy_0", KEY + 10);

    // validate CQs.
    validateCQ(client, "testRegionDestroy_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
        /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Validate InterestList.
    // CREATE
    client.invoke(new CacheSerializableRunnable("validate updates") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regions[0]);
        assertThat(region).isNotNull();

        Set keys = region.entrySet();
        assertThat(keys.size())
            .describedAs(
                "Mismatch, number of keys in local region is not equal to the interest list size")
            .isEqualTo(size);

        CertifiableTestCacheListener ctl =
            (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
        for (int i = 1; i <= 10; i++) {
          ctl.waitForCreated(KEY + i);
          assertThat(region.getEntry(KEY + i)).isNotNull();
        }
      }
    });

    // Destroy Region.
    server.invoke(new CacheSerializableRunnable("Destroy Region") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        region1.destroyRegion();
      }
    });

    Wait.pause(4 * 1000);
    validateCQCount(client, 0);

    closeClient(client);
    closeServer(server);

  }

  /**
   * Test for CQ with multiple clients.
   */
  @Test
  public void testCQWithMultipleClients() {

    VM server = VM.getVM(0);
    VM client1 = VM.getVM(1);
    VM client2 = VM.getVM(2);
    VM client3 = VM.getVM(3);

    /* Create Server and Client */
    createServer(server);
    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    createClient(client1, thePort, host0);
    createClient(client2, thePort, host0);

    /* Create CQs. and initialize the region */
    createCQ(client1, "testCQWithMultipleClients_0", cqs[0]);
    executeCQ(client1, "testCQWithMultipleClients_0", false, null);
    createCQ(client2, "testCQWithMultipleClients_0", cqs[0]);
    executeCQ(client2, "testCQWithMultipleClients_0", false, null);

    int size = 10;

    // Create Values on Server.
    createValues(server, regions[0], size);

    waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10);


    /* Validate the CQs */
    validateCQ(client1, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
        /* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);

    waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10);

    validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
        /* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);


    /* Close test */
    closeCQ(client1, "testCQWithMultipleClients_0");

    validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
        /* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);

    /* Init new client and create cq */
    createClient(client3, thePort, host0);
    createCQ(client3, "testCQWithMultipleClients_0", cqs[0]);
    createCQ(client3, "testCQWithMultipleClients_1", cqs[1]);
    executeCQ(client3, "testCQWithMultipleClients_0", false, null);
    executeCQ(client3, "testCQWithMultipleClients_1", false, null);

    // Update values on Server. This will be updated on new Client CQs.
    createValues(server, regions[0], size);

    waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10);

    validateCQ(client3, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ 1, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 1,
        /* queryDeletes: */ 0, /* totalEvents: */ 1);

    /* Validate the CQ count */
    validateCQCount(client1, 0);
    validateCQCount(client2, 1);
    validateCQCount(client3, 2);

    /* Close Client Test */
    closeClient(client1);

    clearCQListenerEvents(client2, "testCQWithMultipleClients_0");
    clearCQListenerEvents(client3, "testCQWithMultipleClients_1");

    // Update values on server, update again.
    createValues(server, regions[0], size);

    waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10);

    validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
        /* creates: */ size, /* updates: */ size * 2, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3);

    waitForUpdated(client3, "testCQWithMultipleClients_1", KEY + 2);

    validateCQ(client3, "testCQWithMultipleClients_1", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ 2, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 2,
        /* queryDeletes: */ 0, /* totalEvents: */ 2);

    /* Close Server and Client */
    closeClient(client2);
    closeClient(client3);
    closeServer(server);
  }

  /**
   * Test for CQ ResultSet.
   */
  @Test
  public void testCQResultSet() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    /* CQ Test with initial Values. */
    int size = 10;
    createValues(server, regions[0], size);
    Wait.pause(500);

    // Create CQs.
    createCQ(client, "testCQResultSet_0", cqs[0]);

    // Check resultSet Size.
    executeCQ(client, "testCQResultSet_0", true, 10, null);

    /* CQ Test with no Values on Region */
    createCQ(client, "testCQResultSet_1", cqs[2]);
    // Check resultSet Size.
    executeCQ(client, "testCQResultSet_1", true, 0, null);
    stopCQ(client, "testCQResultSet_1");

    // Init values.
    createValues(server, regions[1], 5);
    validateQuery(server, cqs[2], 2);

    executeCQ(client, "testCQResultSet_1", true, 2, null);

    /*
     * compare values... Disabled since we don't currently maintain results on the client
     *
     * validateCQ(client, "testCQResultSet_1", 2, noTest, noTest, noTest); Portfolio[] values = new
     * Portfolio[] {new Portfolio(2), new Portfolio(4)}; HashTable t = new HashTable(); String[]
     * keys = new String[] {"key-2", "key-4"}; t.put(keys[0], values[0]); t.put(keys[1], values[1]);
     *
     * compareValues(client, "testCQResultSet_1", t);
     *
     * deleteValues(server, regions[1], 3); t.remove("key-4"); pause(2 * 1000);
     *
     * try { compareValues(client, "testCQResultSet_1", t);
     * fail("Should have thrown Exception. The value should not be present in cq results region"); }
     * catch (Exception ex) { // @todo check for specific exception type }
     *
     */

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * Test for CQ Listener events.
   */
  @Test
  public void testCQEvents() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testCQEvents_0", cqs[0]);

    executeCQ(client, "testCQEvents_0", false, null);

    // Init values at server.
    int size = 10;
    createValues(server, regions[0], size);

    waitForCreated(client, "testCQEvents_0", KEY + size);

    // validate Create events.
    validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Update values.
    createValues(server, regions[0], 5);
    createValues(server, regions[0], 10);

    waitForUpdated(client, "testCQEvents_0", KEY + size);

    // validate Update events.
    validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
        /* queryDeletes: */ 0, /* totalEvents: */ size + 15);

    // Validate delete events.
    deleteValues(server, regions[0], 5);
    waitForDestroyed(client, "testCQEvents_0", KEY + 5);

    validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
        /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);

    // Insert invalid Events.
    server.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        for (int i = -1; i >= -5; i--) {

          region1.put(KEY + i, KEY + i);
        }
      }
    });

    Wait.pause(1000);
    // cqs should not get any creates, deletes or updates.
    validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
        /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);

    // Close.
    closeClient(client);
    closeServer(server);
  }

  @Test
  public void testCQMapValues() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);
    String mapQuery = "select * from /root/" + regions[0] + " er where er['field1'] > 'value2'";
    int size = 10;

    createServer(server);

    createMapValues(server, regions[0], size / 2);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testCQEvents_0", mapQuery);

    // execute with initial results
    executeCQ(client, "testCQEvents_0", true, 3, null);

    // updates and creates
    createMapValues(server, regions[0], size);

    // wait for last event which is key-9 as the query
    // > 'value2' returns 'value3' to 'value9'
    waitForCreated(client, "testCQEvents_0", KEY + (size - 1));

    // validate events.
    validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ 4,
        /* updates: */ 3, /* deletes; */ 0, /* queryInserts: */ 4, /* queryUpdates: */ 3,
        /* queryDeletes: */ 0, /* totalEvents: */ 7);

    closeCQ(client, "testCQEvents_0");

    createIndex(server, "index1", "er[*]", "/root/" + regions[0] + " er");

    // Create CQs.
    createCQ(client, "testCQEvents_0", mapQuery);

    // execute with initial results
    executeCQ(client, "testCQEvents_0", true, 7, null);

    closeCQ(client, "testCQEvents_0");

    createIndex(server, "index2", "er['field1']", "/root/" + regions[0] + " er");

    // Create CQs.
    createCQ(client, "testCQEvents_0", mapQuery);

    // execute with initial results
    executeCQ(client, "testCQEvents_0", true, 7, null);

    // Close.
    closeClient(client);
    closeServer(server);
  }


  private void createMapValues(VM vm, final String regionName, final int size) {
    vm.invoke(new CacheSerializableRunnable("Create map values") {
      @Override
      public void run2() throws CacheException {
        Region exampleRegion = getRootRegion().getSubregion(regionName);
        for (int i = 1; i <= size; i++) {
          Map<String, String> value = new HashMap<>();
          value.put("field1", "value" + i);
          value.put("field2", "key" + i);
          exampleRegion.put(KEY + i, value);
        }
        logger.info("### Number of Entries in Region :" + exampleRegion.keySet().size());
      }
    });
  }


  /**
   * Test for stopping and restarting CQs.
   */
  @Test
  public void testEnableDisableCQ() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testEnableDisable_0", cqs[0]);
    executeCQ(client, "testEnableDisable_0", false, null);

    /* Test for disableCQ */
    client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
      @Override
      public void run2() throws CacheException {
        // Get CQ Service.
        QueryService cqService;
        try {
          cqService = getCache().getQueryService();
          cqService.stopCqs();
        } catch (Exception cqe) {
          cqe.printStackTrace();
          fail("Failed to getCQService.");
        }
      }
    });

    Wait.pause(1000);
    // Init values at server.
    int size = 10;
    createValues(server, regions[0], size);
    Wait.pause(500);
    // There should not be any creates.
    validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 0,
        /* queryDeletes: */ 0, /* totalEvents: */ 0);

    /* Test for enable CQ */
    client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
      @Override
      public void run2() throws CacheException {
        // Get CQ Service.
        QueryService cqService;
        try {
          cqService = getCache().getQueryService();
          cqService.executeCqs();
        } catch (Exception cqe) {
          cqe.printStackTrace();
          fail("Failed to getCQService.");
        }
      }
    });
    Wait.pause(1000);
    createValues(server, regions[0], size);
    waitForUpdated(client, "testEnableDisable_0", KEY + size);
    // It gets created on the CQs
    validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    /* Test for disableCQ on Region */
    client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
      @Override
      public void run2() throws CacheException {
        // Get CQ Service.
        QueryService cqService;
        try {
          cqService = getCache().getQueryService();
          cqService.stopCqs("/root/" + regions[0]);
        } catch (Exception cqe) {
          cqe.printStackTrace();
          fail("Failed to getCQService.");
        }
      }
    });

    Wait.pause(2 * 1000);
    deleteValues(server, regions[0], size / 2);
    Wait.pause(500);
    // There should not be any deletes.
    validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
        /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    /* Test for enable CQ on region */
    client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
      @Override
      public void run2() throws CacheException {
        // Get CQ Service.
        QueryService cqService;
        try {
          cqService = getCache().getQueryService();
          cqService.executeCqs("/root/" + regions[0]);
        } catch (Exception cqe) {
          cqe.printStackTrace();
          fail("Failed to getCQService.");
        }
      }
    });
    Wait.pause(1000);
    createValues(server, regions[0], size / 2);
    waitForCreated(client, "testEnableDisable_0", KEY + (size / 2));
    // Gets updated on the CQ.
    validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ size / 2,
        /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ size / 2,
        /* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size * 3 / 2);

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * Test for Complex queries.
   */
  @Test
  public void testQuery() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testQuery_3", cqs[3]);
    executeCQ(client, "testQuery_3", true, null);

    createCQ(client, "testQuery_4", cqs[4]);
    executeCQ(client, "testQuery_4", true, null);

    createCQ(client, "testQuery_5", cqs[5]);
    executeCQ(client, "testQuery_5", true, null);

    createCQ(client, "testQuery_6", cqs[6]);
    executeCQ(client, "testQuery_6", true, null);

    createCQ(client, "testQuery_7", cqs[7]);
    executeCQ(client, "testQuery_7", true, null);

    createCQ(client, "testQuery_8", cqs[8]);
    executeCQ(client, "testQuery_8", true, null);

    // Close.
    closeClient(client);
    closeServer(server);
  }

  /**
   * Test for CQ Fail over.
   */
  @Test
  public void testCQFailOver() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    // Create client.
    // Properties props = new Properties();
    // Create client with redundancyLevel -1

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    createClient(client, new int[] {port1, ports[0]}, host0, "-1");

    int numCQs = 1;
    for (int i = 0; i < numCQs; i++) {
      // Create CQs.
      createCQ(client, "testCQFailOver_" + i, cqs[i]);
      executeCQ(client, "testCQFailOver_" + i, false, null);
    }
    Wait.pause(1000);

    // CREATE.
    createValues(server1, regions[0], 10);
    createValues(server1, regions[1], 10);
    waitForCreated(client, "testCQFailOver_0", KEY + 10);

    Wait.pause(1000);

    createServer(server2, ports[0]);
    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    System.out
        .println("### Port on which server1 running : " + port1 + " Server2 running : " + thePort2);

    Wait.pause(8 * 1000);

    // UPDATE - 1.
    createValues(server1, regions[0], 10);
    createValues(server1, regions[1], 10);

    waitForUpdated(client, "testCQFailOver_0", KEY + 10);

    int[] resultsCnt = new int[] {10, 1, 2};

    for (int i = 0; i < numCQs; i++) {
      validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
    }

    // Close server1.
    closeServer(server1);

    // Fail over should happen.
    Wait.pause(3 * 1000);

    for (int i = 0; i < numCQs; i++) {
      validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
    }

    // UPDATE - 2
    this.clearCQListenerEvents(client, "testCQFailOver_0");
    createValues(server2, regions[0], 10);
    createValues(server2, regions[1], 10);

    for (int i = 1; i <= 10; i++) {
      waitForUpdated(client, "testCQFailOver_0", KEY + i);
    }

    for (int i = 0; i < numCQs; i++) {
      validateCQ(client, "testCQFailOver_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
    }

    // Close.
    closeClient(client);
    closeServer(server2);
  }

  /**
   * Test for CQ Fail over/HA with redundancy level set.
   */
  @Test
  public void testCQHA() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM server3 = VM.getVM(2);

    VM client = VM.getVM(3);

    createServer(server1);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);

    createServer(server2, ports[0]);
    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());

    createServer(server3, ports[1]);
    final int port3 = server3.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    System.out.println("### Port on which server1 running : " + port1 + " server2 running : "
        + thePort2 + " Server3 running : " + port3);

    // Create client - With 3 server endpoints and redundancy level set to 2.

    // Create client with redundancyLevel 1
    createClient(client, new int[] {port1, thePort2, port3}, host0, "1");

    // Create CQs.
    int numCQs = 1;
    for (int i = 0; i < numCQs; i++) {
      // Create CQs.
      createCQ(client, "testCQHA_" + i, cqs[i]);
      executeCQ(client, "testCQHA_" + i, false, null);
    }

    Wait.pause(1000);

    // CREATE.
    createValues(server1, regions[0], 10);
    createValues(server1, regions[1], 10);

    waitForCreated(client, "testCQHA_0", KEY + 10);

    // Clients expected initial result.
    int[] resultsCnt = new int[] {10, 1, 2};

    // Close server1.
    // To maintain the redundancy; it will make connection to endpoint-3.
    closeServer(server1);
    Wait.pause(3 * 1000);

    // UPDATE-1.
    createValues(server2, regions[0], 10);
    createValues(server2, regions[1], 10);

    waitForUpdated(client, "testCQHA_0", KEY + 10);

    // Validate CQ.
    for (int i = 0; i < numCQs; i++) {
      validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
    }

    // Close server-2
    closeServer(server2);
    Wait.pause(2 * 1000);

    // UPDATE - 2.
    clearCQListenerEvents(client, "testCQHA_0");

    createValues(server3, regions[0], 10);
    createValues(server3, regions[1], 10);

    // Wait for events at client.

    waitForUpdated(client, "testCQHA_0", KEY + 10);

    for (int i = 0; i < numCQs; i++) {
      validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
    }

    // Close.
    closeClient(client);
    closeServer(server3);
  }

  /**
   * Test without CQs. This was added after an exception encountered with CQService, when there was
   * no CQService initiated.
   */
  @Test
  public void testWithoutCQs() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);
    createServer(server2);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());

    SerializableRunnable createConnectionPool = new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        getCache();

        AttributesFactory regionFactory = new AttributesFactory();
        regionFactory.setScope(Scope.LOCAL);

        ClientServerTestCase.configureConnectionPool(regionFactory, host0, port1, thePort2, true,
            -1, -1, null);

        createRegion(regions[0], regionFactory.createRegionAttributes());
      }
    };

    // Create client.
    client.invoke(createConnectionPool);

    server1.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        for (int i = 0; i < 20; i++) {
          region1.put("key-string-" + i, "value-" + i);
        }
      }
    });

    // Put some values on the client.
    client.invoke(new CacheSerializableRunnable("Put values client") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);

        for (int i = 0; i < 10; i++) {
          region1.put("key-string-" + i, "client-value-" + i);
        }
      }
    });

    Wait.pause(2 * 1000);
    closeServer(server1);
    closeServer(server2);
  }


  /**
   * Test getCQs for a regions
   */
  @Test
  public void testGetCQsForARegionName() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testQuery_3", cqs[3]);
    executeCQ(client, "testQuery_3", true, null);

    createCQ(client, "testQuery_4", cqs[4]);
    executeCQ(client, "testQuery_4", true, null);

    createCQ(client, "testQuery_5", cqs[5]);
    executeCQ(client, "testQuery_5", true, null);

    createCQ(client, "testQuery_6", cqs[6]);
    executeCQ(client, "testQuery_6", true, null);
    // with regions[1]
    createCQ(client, "testQuery_7", cqs[7]);
    executeCQ(client, "testQuery_7", true, null);

    createCQ(client, "testQuery_8", cqs[8]);
    executeCQ(client, "testQuery_8", true, null);

    client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
      @Override
      public void run2() throws CacheException {
        // Get CQ Service.
        QueryService cqService;
        try {
          cqService = getCache().getQueryService();
          CqQuery[] cq = cqService.getCqs("/root/" + regions[0]);
          assertThat(cq)
              .describedAs(
                  "CQService should not return null for cqs on this region : /root/" + regions[0])
              .isNotNull();
          getCache().getLogger().info("cqs for region: /root/" + regions[0] + " : " + cq.length);
          // closing on of the cqs.

          cq[0].close();
          cq = cqService.getCqs("/root/" + regions[0]);
          assertThat(cq)
              .describedAs(
                  "CQService should not return null for cqs on this region : /root/" + regions[0])
              .isNotNull();
          getCache().getLogger().info("cqs for region: /root/" + regions[0]
              + " after closing one of the cqs : " + cq.length);

          cq = cqService.getCqs("/root/" + regions[1]);
          getCache().getLogger().info("cqs for region: /root/" + regions[1] + " : " + cq.length);
          assertThat(cq)
              .describedAs(
                  "CQService should not return null for cqs on this region : /root/" + regions[1])
              .isNotNull();
        } catch (Exception cqe) {
          fail("Failed to getCQService", cqe);
        }
      }
    });

    // Close.
    closeClient(client);
    closeServer(server);

  }


  /**
   * Test exception message thrown when replicate region with local destroy is used
   */
  @Test
  public void testCqExceptionForReplicateRegionWithEvictionLocalDestroy() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServerOnly(server, 0);
    createReplicateRegionWithLocalDestroy(server, new String[] {regions[0]});

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createLocalRegion(client, new int[] {thePort}, host0, "-1", new String[] {regions[0]});
    // Create CQs.
    createCQ(client, "testQuery_3", "select * from /" + regions[0]);
    String expectedError =
        String.format("CQ is not supported for replicated region: %s with eviction action: %s",
            "/" + regions[0], EvictionAction.LOCAL_DESTROY);
    try {
      executeCQ(client, "testQuery_3", false, expectedError);
    } catch (Exception e) {
      assertThat(e.getCause().getCause().getMessage().contains(expectedError)).isTrue();

    }
    // Close.
    closeClient(client);
    closeServer(server);

  }

  /**
   * Tests execution of queries with NULL in where clause like where ID = NULL etc.
   */
  @Test
  public void testQueryWithNULLInWhereClause() {
    VM server = VM.getVM(0);
    VM client = VM.getVM(1);
    VM producer = VM.getVM(2);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);
    // producer is not doing any thing.
    createClient(producer, thePort, host0);

    final int size = 50;
    createValues(producer, regions[0], size);

    createCQ(client, "testQuery_9", cqs[9]);
    executeCQ(client, "testQuery_9", true, null);

    createValues(producer, regions[0], (2 * size));

    for (int i = 1; i <= size; i++) {
      if (i % 2 == 0) {
        waitForUpdated(client, "testQuery_9", KEY + i);
      }
    }

    for (int i = (size + 1); i <= 2 * size; i++) {
      if (i % 2 == 0) {
        waitForCreated(client, "testQuery_9", KEY + i);
      }
    }

    validateCQ(client, "testQuery_9", noTest, 25, 25, noTest);

    // Close.
    closeClient(client);
    closeServer(server);

  }

  /**
   * Tests execution of queries with NULL in where clause like where ID = NULL etc.
   */
  @Test
  public void testForSupportedRegionAttributes() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    // Create server with Global scope.
    SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
      @Override
      public void run2() throws CacheException {
        logger.info("### Create Cache Server. ###");

        // Create region with Global scope
        AttributesFactory factory1 = new AttributesFactory();
        factory1.setScope(Scope.GLOBAL);
        factory1.setDataPolicy(DataPolicy.REPLICATE);
        createRegion(regions[0], factory1.createRegionAttributes());

        // Create region with non Global, distributed_ack scope
        AttributesFactory factory2 = new AttributesFactory();
        factory2.setScope(Scope.DISTRIBUTED_NO_ACK);
        factory2.setDataPolicy(DataPolicy.REPLICATE);
        createRegion(regions[1], factory2.createRegionAttributes());

        Wait.pause(2000);

        try {
          startBridgeServer(port, true);
        } catch (Exception ex) {
          fail("While starting CacheServer", ex);
        }
        Wait.pause(2000);

      }
    };

    server1.invoke(createServer);
    server2.invoke(createServer);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName();

    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());

    // Create client.
    createClient(client, new int[] {port1, thePort2}, host0, "-1");

    // Create CQ on region with GLOBAL SCOPE.
    createCQ(client, "testForSupportedRegionAttributes_0", cqs[0]);
    executeCQ(client, "testForSupportedRegionAttributes_0", false, null);

    int size = 5;

    createValues(server1, regions[0], size);

    for (int i = 1; i <= size; i++) {
      waitForCreated(client, "testForSupportedRegionAttributes_0", KEY + i);
    }

    // Create CQ on region with non GLOBAL, DISTRIBUTED_ACK SCOPE.
    createCQ(client, "testForSupportedRegionAttributes_1", cqs[2]);

    String errMsg =
        "The replicated region " + " specified in CQ creation does not have scope supported by CQ."
            + " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL.";
    final String expectedErr = "Cq not registered on primary";
    client.invoke(new CacheSerializableRunnable("Set expect") {
      @Override
      public void run2() {
        getCache().getLogger()
            .info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
      }
    });

    try {
      executeCQ(client, "testForSupportedRegionAttributes_1", false, "CqException");
      fail("The test should have failed with exception, " + errMsg);
    } catch (Exception ex) {
      // Expected.
    } finally {
      client.invoke(new CacheSerializableRunnable("Remove expect") {
        @Override
        public void run2() {
          getCache().getLogger()
              .info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
        }
      });
    }

    // Close.
    closeClient(client);
    closeServer(server1);
    closeServer(server2);

  }

  @Test
  public void testCQWhereCondOnShort() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    /* CQ Test with initial Values. */
    int size = 5;
    createValuesWithShort(server, regions[0], size);
    Wait.pause(500);

    // Create CQs.
    createCQ(client, "testCQResultSet_0", shortTypeCQs[0]);

    // Check resultSet Size.
    executeCQ(client, "testCQResultSet_0", true, 5, null);

    closeClient(client);
    closeServer(server);
  }

  @Test
  public void testCQEquals() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    /* CQ Test with initial Values. */
    int size = 10;
    // create values
    createValuesAsPrimitives(server, regions[0], size);
    Wait.pause(500);

    // Create CQs.
    createCQ(client, "equalsQuery1", "select * from /root/regionA p where p.equals('seeded')");

    // Check resultSet Size.
    executeCQ(client, "equalsQuery1", true, 2, null);

    // Create CQs.
    createCQ(client, "equalsQuery2", "select * from /root/regionA p where p='seeded'");

    // Check resultSet Size.
    executeCQ(client, "equalsQuery2", true, 2, null);

    // Create CQs.
    createCQ(client, "equalsStatusQuery1",
        "select * from /root/regionA p where p.status.equals('inactive')");

    // Check resultSet Size.
    executeCQ(client, "equalsStatusQuery1", true, 1, null);

    // Create CQs.
    createCQ(client, "equalsStatusQuery2",
        "select * from /root/regionA p where p.status='inactive'");

    // Check resultSet Size.
    executeCQ(client, "equalsStatusQuery2", true, 1, null);

    closeClient(client);
    closeServer(server);
  }

  @Test
  public void testCQEqualsWithIndex() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    /* CQ Test with initial Values. */
    int size = 10;
    // create values
    createIndex(server, "index1", "p.status", "/root/regionA p");
    createValuesAsPrimitives(server, regions[0], size);
    Wait.pause(500);

    // Create CQs.
    createCQ(client, "equalsQuery1", "select * from /root/regionA p where p.equals('seeded')");

    // Check resultSet Size.
    executeCQ(client, "equalsQuery1", true, 2, null);

    // Create CQs.
    createCQ(client, "equalsQuery2", "select * from /root/regionA p where p='seeded'");

    // Check resultSet Size.
    executeCQ(client, "equalsQuery2", true, 2, null);

    // Create CQs.
    createCQ(client, "equalsStatusQuery1",
        "select * from /root/regionA p where p.status.equals('inactive')");

    // Check resultSet Size.
    executeCQ(client, "equalsStatusQuery1", true, 1, null);

    // Create CQs.
    createCQ(client, "equalsStatusQuery2",
        "select * from /root/regionA p where p.status='inactive'");

    // Check resultSet Size.
    executeCQ(client, "equalsStatusQuery2", true, 1, null);

    closeClient(client);
    closeServer(server);
  }


  // Tests that cqs get an onCqDisconnect and onCqConnect
  @Test
  public void testCQAllServersCrash() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
    createClient(client, new int[] {port1, ports[0]}, host0, "-1");

    // Create CQs.
    createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
    executeCQ(client, "testCQAllServersLeave_" + 11, false, null);

    Wait.pause(5 * 1000);
    waitForCqsConnected(client, "testCQAllServersLeave_11", 1);

    // CREATE.
    createValues(server1, regions[0], 10);
    waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);

    createServer(server2, ports[0]);
    server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    Wait.pause(8 * 1000);

    // Close server1.
    crashServer(server1);

    Wait.pause(3 * 1000);

    crashServer(server2);

    Wait.pause(3 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);

    // Close.
    closeClient(client);
    closeCrashServer(server1);
    closeCrashServer(server2);
  }

  // Tests that we receive both an onCqConnected and a onCqDisconnected message
  @Test
  public void testCQAllServersLeave() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);
    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    createClient(client, new int[] {port1, ports[0]}, host0, "-1");

    Wait.pause(5 * 1000);
    // Create CQs.
    createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
    executeCQ(client, "testCQAllServersLeave_" + 11, false, null);

    Wait.pause(5 * 1000);
    waitForCqsConnected(client, "testCQAllServersLeave_11", 1);
    // CREATE.
    createValues(server1, regions[0], 10);
    waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);

    createServer(server2, ports[0]);
    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    Wait.pause(10 * 1000);

    // Close server1 and pause so server has chance to close
    closeServer(server1);
    Wait.pause(10 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 0);

    // Close server 2 and pause so server has a chance to close
    closeServer(server2);
    Wait.pause(10 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);

    // Close.
    closeClient(client);
  }

  // Test CQStatus listeners, onCqDisconnect should trigger when All servers leave
  // and onCqConnect should trigger when a cq is first connected and when the pool
  // goes from no primary queue to having a primary
  @Test
  public void testCQAllServersLeaveAndRejoin() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);
    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    createClient(client, new int[] {port1, ports[0]}, host0, "-1");

    // Create CQs.
    createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
    executeCQ(client, "testCQAllServersLeave_" + 11, false, null);
    Wait.pause(5 * 1000);
    // listener should have had onCqConnected invoked
    waitForCqsConnected(client, "testCQAllServersLeave_11", 1);

    // create entries
    createValues(server1, regions[0], 10);
    waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);

    // start server 2
    createServer(server2, ports[0]);
    server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    Wait.pause(8 * 1000);

    // Close server1.
    closeServer(server1);
    // Give the server time to shut down
    Wait.pause(10 * 1000);
    // We should not yet get a disconnect because we still have server2
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 0);

    // Close the server2
    closeServer(server2);
    Wait.pause(10 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);

    // reconnect server1. Our total connects for this test run are now 2
    restartBridgeServer(server1, port1);
    Wait.pause(10 * 1000);
    waitForCqsConnected(client, "testCQAllServersLeave_11", 2);

    // Disconnect again and now our total disconnects should be 2
    Wait.pause(10 * 1000);
    closeServer(server1);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 2);

    // Close.
    closeClient(client);
  }


  /*
   * Tests that the cqs do not get notified if primary leaves and a new primary is elected
   */
  @Test
  public void testCQPrimaryLeaves() {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);
    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
    createClient(client, new int[] {port1, ports[0]}, host0, "-1");

    // Create CQs.
    createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
    executeCQ(client, "testCQAllServersLeave_" + 11, false, null);

    Wait.pause(5 * 1000);
    waitForCqsConnected(client, "testCQAllServersLeave_11", 1);
    // CREATE.
    createValues(server1, regions[0], 10);
    waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);

    createServer(server2, ports[0]);
    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    Wait.pause(8 * 1000);

    // Close server1 and give time for server1 to actually shutdown
    closeServer(server1);
    Wait.pause(10 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 0);

    // Close server2 and give time for server2 to shutdown before checking disconnected count
    closeServer(server2);
    Wait.pause(10 * 1000);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);

    // Close.
    closeClient(client);
    closeServer(server1);
  }

  // Tests when two pools each are configured to a different server
  // Each cq uses a different pool and the servers are shutdown.
  // The listeners for each cq should receive a connect and disconnect
  // when their respective servers are shutdown
  @Test
  public void testCQAllServersLeaveMultiplePool() throws Exception {
    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client = VM.getVM(2);

    createServer(server1);

    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();
    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    createServer(server2, ports[0]);
    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    Wait.pause(8 * 1000);

    // Create client
    createClientWith2Pools(client, new int[] {port1}, new int[] {thePort2}, host0, "-1");

    // Create CQs.
    createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
    executeCQ(client, "testCQAllServersLeave_" + 11, false, null);

    createCQ(client, "testCQAllServersLeave_" + 12, cqs[12], true);
    executeCQ(client, "testCQAllServersLeave_" + 12, false, null);

    Wait.pause(5 * 1000);
    waitForCqsConnected(client, "testCQAllServersLeave_11", 1);
    waitForCqsConnected(client, "testCQAllServersLeave_12", 1);
    // CREATE.
    createValues(server2, regions[0], 10);
    createValues(server2, regions[1], 10);
    waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);

    // Close server1 pause is unnecessary here since each cq has it's own pool
    // and each pool is configured only to 1 server.
    closeServer(server1);
    waitForCqsDisconnected(client, "testCQAllServersLeave_11", 1);

    createValues(server2, regions[1], 20);
    waitForCreated(client, "testCQAllServersLeave_12", KEY + 19);

    // Close server 2 pause unnecessary here since each cq has it's own pool
    closeServer(server2);
    waitForCqsDisconnected(client, "testCQAllServersLeave_12", 1);

    // Close.
    closeClient(client);
  }


  @Test
  public void testCqCloseAndExecuteWithInitialResults() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    /* CQ Test with initial Values. */
    int size = 5;
    createValuesWithShort(server, regions[0], size);
    Wait.pause(500);

    // Create CQs.
    executeAndCloseAndExecuteIRMultipleTimes(client, shortTypeCQs[0]);

    closeClient(client);

    closeServer(server);
  }

  @Test
  public void testCQEventsWithNotEqualsUndefined() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    createServer(server);

    final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    createClient(client, thePort, host0);

    // Create CQs.
    createCQ(client, "testCQEventsWithUndefined_0",
        "SELECT ALL * FROM /root/" + regions[0] + " p where p.position2.secId <> 'ABC'");

    executeCQ(client, "testCQEventsWithUndefined_0", false, null);

    // Init values at server.
    int size = 10;
    createValues(server, regions[0], size);

    waitForCreated(client, "testCQEventsWithUndefined_0", KEY + size);

    // validate Create events.
    validateCQ(client, "testCQEventsWithUndefined_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 0,
        /* queryDeletes: */ 0, /* totalEvents: */ size);

    // Update values.
    createValues(server, regions[0], 5);
    createValues(server, regions[0], 10);

    waitForUpdated(client, "testCQEventsWithUndefined_0", KEY + size);

    // validate Update events.
    validateCQ(client, "testCQEventsWithUndefined_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
        /* queryDeletes: */ 0, /* totalEvents: */ size + 15);

    // Validate delete events.
    deleteValues(server, regions[0], 5);
    waitForDestroyed(client, "testCQEventsWithUndefined_0", KEY + 5);

    validateCQ(client, "testCQEventsWithUndefined_0", /* resultSize: */ noTest, /* creates: */ size,
        /* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
        /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);

    // Insert invalid Events.
    server.invoke(new CacheSerializableRunnable("Create values") {
      @Override
      public void run2() throws CacheException {
        Region region1 = getRootRegion().getSubregion(regions[0]);
        for (int i = -1; i >= -5; i--) {
          region1.put(KEY + i, KEY + i);
        }
      }
    });

    Wait.pause(1000);
    // cqs should get any creates and inserts even for invalid
    // since this is a NOT EQUALS query which adds Undefined to
    // results
    validateCQ(client, "testCQEventsWithUndefined_0", /* resultSize: */ noTest,
        /* creates: */ size + 5, /* updates: */ 15, /* deletes; */5, /* queryInserts: */ size + 5,
        /* queryUpdates: */ 15, /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5 + 5);

    // Close.
    closeClient(client);
    closeServer(server);
  }

  // HELPER METHODS....

  /* For debug purpose - Compares entries in the region */
  private void validateServerClientRegionEntries(VM server, VM client, final String regionName) {

    server.invoke(new CacheSerializableRunnable("Server Region Entries") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        logger.info("### Entries in Server :" + region.keySet().size());
      }
    });

    client.invoke(new CacheSerializableRunnable("Client Region Entries") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        logger.info("### Entries in Client :" + region.keySet().size());
      }
    });
  }

  /*
   * Used only by tests that start and stop a server only to need to start the cache server again
   */
  private void restartBridgeServer(VM server, final int port) {
    server.invoke(new CacheSerializableRunnable("Start cache server") {
      @Override
      public void run2() {
        try {
          restartBridgeServers(getCache());
        } catch (IOException e) {
          throw new CacheException(e) {};
        }
      }
    });
  }


  /**
   * Starts a cache server on the given port to serve up the given region.
   *
   * @since GemFire 4.0
   */
  public void startBridgeServer(int port) throws IOException {
    startBridgeServer(port, CacheServer.DEFAULT_NOTIFY_BY_SUBSCRIPTION);
  }

  /**
   * Starts a cache server on the given port, using the given deserializeValues and
   * notifyBySubscription to serve up the given region.
   *
   * @since GemFire 4.0
   */
  public void startBridgeServer(int port, boolean notifyBySubscription) throws IOException {

    Cache cache = getCache();
    CacheServer bridge = cache.addCacheServer();
    bridge.setPort(port);
    bridge.setNotifyBySubscription(notifyBySubscription);
    bridge.start();
    bridgeServerPort = bridge.getPort();
  }

  /**
   * Stops the cache server that serves up the given cache.
   *
   * @since GemFire 4.0
   */
  private void stopBridgeServer(Cache cache) {
    CacheServer bridge = cache.getCacheServers().iterator().next();
    bridge.stop();
    assertThat(bridge.isRunning()).isFalse();
  }

  private void stopBridgeServers(Cache cache) {
    CacheServer bridge;
    for (CacheServer cacheServer : cache.getCacheServers()) {
      bridge = cacheServer;
      bridge.stop();
      assertThat(bridge.isRunning()).isFalse();
    }
  }

  private void restartBridgeServers(Cache cache) throws IOException {
    CacheServer bridge;
    for (CacheServer cacheServer : cache.getCacheServers()) {
      bridge = cacheServer;
      bridge.start();
      assertThat(bridge.isRunning()).isTrue();
    }
  }

  private InternalDistributedSystem createLonerDS() {
    disconnectFromDS();
    Properties lonerProps = new Properties();
    lonerProps.setProperty(MCAST_PORT, "0");
    lonerProps.setProperty(LOCATORS, "");
    InternalDistributedSystem ds = getSystem(lonerProps);
    assertThat(ds.getDistributionManager().getOtherDistributionManagerIds().size()).isEqualTo(0);
    return ds;
  }

  /**
   * Returns region attributes for a <code>LOCAL</code> region
   */
  protected RegionAttributes getRegionAttributes() {
    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.LOCAL);
    return factory.createRegionAttributes();
  }


}
