/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.cq.dunit;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.cq.internal.CqServiceImpl;
import org.apache.geode.cache.query.cq.internal.ServerCQImpl;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;

/**
 * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
 * data activities.
 */
@SuppressWarnings("SpellCheckingInspection")
@Category({ClientSubscriptionTest.class})
public class CqPerfDUnitTest extends JUnit4CacheTestCase {
  private final Logger logger = LogService.getLogger();
  @SuppressWarnings("CanBeFinal")
  private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();

  public CqPerfDUnitTest() {
    super();
  }

  @Override
  public final void postSetUp() {
    // avoid IllegalStateException from HandShake by connecting all vms tor
    // system before creating connection pools
    getSystem();
    Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
      @Override
      public void run() {
        getSystem();
      }
    });
  }

  /**
   * Tests the cq performance.
   */
  @Ignore("perf")
  @Test
  public void testCQPerf() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    cqDUnitTest.createServer(server);

    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    cqDUnitTest.createClient(client, port, host0);
    final String cqName = "testCQPerf_0";

    client.invoke(() -> {
      logger.info("### Create CQ. ###" + cqName);
      // Get CQ Service.
      QueryService cqService =
          getCache().getQueryService();

      // Create CQ Attributes.
      CqAttributesFactory cqf = new CqAttributesFactory();
      CqListener[] cqListeners = {new CqTimeTestListener(LogWriterUtils.getLogWriter())};
      ((CqTimeTestListener) cqListeners[0]).cqName = cqName;

      cqf.initCqListeners(cqListeners);
      CqAttributes cqa = cqf.create();

      // Create and Execute CQ.
      CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
      cq1.execute();
    });

    final int size = 50;

    // Create values.
    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);

    Wait.pause(5000);

    // Update values
    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);

    client.invoke(new CacheSerializableRunnable("Validate CQs") {
      @Override
      public void run2() throws CacheException {
        logger.info("### Validating CQ. ### " + cqName);
        // Get CQ Service.
        QueryService cqService = getCache().getQueryService();

        CqQuery cQuery = cqService.getCq(cqName);
        assertThat(cQuery).isNotNull();

      }
    });

    Wait.pause(10 * 60 * 1000);

    // Close.
    cqDUnitTest.closeClient(client);
    cqDUnitTest.closeServer(server);

  }

  /**
   * Test for maintaining keys for update optimization.
   */
  @Test
  public void testKeyMaintenance() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    cqDUnitTest.createServer(server);
    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();
    cqDUnitTest.createClient(client, port, host0);

    // Cq1
    cqDUnitTest.createCQ(client, "testKeyMaintenance_0", cqDUnitTest.cqs[0]);
    cqDUnitTest.executeCQ(client, "testKeyMaintenance_0", false, null);

    // Cq2
    cqDUnitTest.createCQ(client, "testKeyMaintenance_1", cqDUnitTest.cqs[10]);
    cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);

    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 1);
    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 1);

    // Entry is made into the CQs cache hashSet.
    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 0
    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;
        String serverCqName = cqQuery.getServerCqName();
        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(1);
        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
              .isEqualTo(0);
        }
      }
    });

    // Update 1.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 10);

    // Entry/check is made into the CQs cache hashSet.
    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;

        String serverCqName = cqQuery.getServerCqName();
        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(10);
        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
              .isEqualTo(5);
        }
      }
    });

    // Update.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 12);

    // Entry/check is made into the CQs cache hashSet.
    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;
        String serverCqName = cqQuery.getServerCqName();
        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(12);
        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
              .isEqualTo(6);
        }
      }
    });

    // Delete.
    cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 6);
    cqDUnitTest.waitForDestroyed(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 6);

    // Entry/check is made into the CQs cache hashSet.
    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;

        String serverCqName = cqQuery.getServerCqName();
        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(6);
        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
              .isEqualTo(3);
        }
      }
    });

    // Stop CQ.
    // This should still needs to process the events so that Results are up-to-date.
    cqDUnitTest.stopCQ(client, "testKeyMaintenance_1");
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);

    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;
        String serverCqName = cqQuery.getServerCqName();
        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(12);
        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
              .isEqualTo(6);
        }
      }
    });

    // re-start the CQ.
    cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);

    // This will remove the caching for this CQ.
    cqDUnitTest.closeCQ(client, "testKeyMaintenance_1");
    server.invoke(() -> {
      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
      for (InternalCqQuery cq : cqs) {
        ServerCQImpl cqQuery = (ServerCQImpl) cq;

        String serverCqName = cqQuery.getServerCqName();
        assertThat(serverCqName.startsWith("testKeyMaintenance_1")).isFalse();

        if (serverCqName.startsWith("testKeyMaintenance_0")) {
          assertThat(cqQuery.getCqResultKeysSize())
              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
              .isEqualTo(12);
        }
      }
    });

    // Close.
    cqDUnitTest.closeClient(client);
    cqDUnitTest.closeServer(server);
  }

  /**
   * Test for common CQs. To test the changes relating to, executing CQ only once for all similar
   * CQs.
   */
  @Test
  public void testMatchingCqs() {

    VM server = VM.getVM(0);
    VM client = VM.getVM(1);

    cqDUnitTest.createServer(server);
    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();
    cqDUnitTest.createClient(client, port, host0);

    // Create and Execute same kind of CQs.
    for (int i = 0; i < 4; i++) {
      cqDUnitTest.createCQ(client, "testMatchingCqs_" + i, cqDUnitTest.cqs[0]);
      cqDUnitTest.executeCQ(client, "testMatchingCqs_" + i, false, null);
    }

    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 4);

    int size = 1;

    // Create.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
    cqDUnitTest.waitForCreated(client, "testMatchingCqs_0", CqQueryDUnitTest.KEY + size);
    cqDUnitTest.waitForCreated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);

    // Close one of the CQ.
    cqDUnitTest.closeCQ(client, "testMatchingCqs_0");
    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 3);

    // Update.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);

    // Stop one of the CQ.
    cqDUnitTest.stopCQ(client, "testMatchingCqs_1");

    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 2);

    // Update - 2.
    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);

    // stopped CQ should not receive 2nd/previous updates.
    cqDUnitTest.validateCQ(client, "testMatchingCqs_1", /* resultSize: */ CqQueryDUnitTest.noTest,
        /* creates: */ size, /* updates: once */ size, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ size, /* queryDeletes: */ 0, /* totalEvents: */ size * 2);

    // Execute the stopped CQ.
    cqDUnitTest.executeCQ(client, "testMatchingCqs_1", false, null);

    // Update - 3.
    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY + size);

    cqDUnitTest.validateCQ(client, "testMatchingCqs_1", /* resultSize: */ CqQueryDUnitTest.noTest,
        /* creates: */ size, /* updates: 2 */ size * 2, /* deletes; */ 0, /* queryInserts: */ size,
        /* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3);

    // Create different kind of CQs.
    cqDUnitTest.createCQ(client, "testMatchingCqs_4", cqDUnitTest.cqs[1]);
    cqDUnitTest.executeCQ(client, "testMatchingCqs_4", false, null);

    cqDUnitTest.createCQ(client, "testMatchingCqs_5", cqDUnitTest.cqs[1]);
    cqDUnitTest.executeCQ(client, "testMatchingCqs_5", false, null);

    cqDUnitTest.createCQ(client, "testMatchingCqs_6", cqDUnitTest.cqs[2]);
    cqDUnitTest.executeCQ(client, "testMatchingCqs_6", false, null);

    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2);

    cqDUnitTest.closeCQ(client, "testMatchingCqs_6");
    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2);

    cqDUnitTest.closeCQ(client, "testMatchingCqs_5");
    cqDUnitTest.closeCQ(client, "testMatchingCqs_4");
    cqDUnitTest.closeCQ(client, "testMatchingCqs_3");
    cqDUnitTest.closeCQ(client, "testMatchingCqs_2");
    cqDUnitTest.closeCQ(client, "testMatchingCqs_1");

    validateMatchingCqs(server, 0, null, 0);

    // update 4
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);

    // Close.
    cqDUnitTest.closeClient(client);
    cqDUnitTest.closeServer(server);
  }


  /**
   * Test for common CQs. To test the changes relating to, executing CQ only once for all similar
   * CQs.
   */
  @Test
  public void testMatchingCQWithMultipleClients() {

    VM server = VM.getVM(0);
    VM client1 = VM.getVM(1);
    VM client2 = VM.getVM(2);
    VM client3 = VM.getVM(3);

    VM clients[] = new VM[] {client1, client2, client3};

    cqDUnitTest.createServer(server);
    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();

    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.createClient(clients[clientIndex], port, host0);
      // Create and Execute same kind of CQs.
      for (int i = 0; i < 4; i++) {
        cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_" + i,
            cqDUnitTest.cqs[0]);
        cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_" + i, false,
            null);
      }
    }

    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], clients.length * 4);

    int size = 1;

    // Create.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.waitForCreated(clients[clientIndex], "testMatchingCQWithMultipleClients_0",
          CqQueryDUnitTest.KEY + size);
      cqDUnitTest.waitForCreated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
          CqQueryDUnitTest.KEY + size);
    }

    // Close one of the CQ.
    cqDUnitTest.closeCQ(client1, "testMatchingCQWithMultipleClients_0");
    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);

    // Update.
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);

    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
          CqQueryDUnitTest.KEY + size);
    }

    // Stop one of the CQ.
    cqDUnitTest.stopCQ(client2, "testMatchingCQWithMultipleClients_1");

    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 2);

    // Update - 2.
    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.clearCQListenerEvents(clients[clientIndex],
          "testMatchingCQWithMultipleClients_3");
    }

    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);

    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
          CqQueryDUnitTest.KEY + size);
    }

    // stopped CQ should not receive 2nd/previous updates.
    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
        /* resultSize: */ CqQueryDUnitTest.noTest, /* creates: */ size, /* updates: once */ size,
        /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ size, /* queryDeletes: */ 0,
        /* totalEvents: */ size * 2);

    // Execute the stopped CQ.
    cqDUnitTest.executeCQ(client2, "testMatchingCQWithMultipleClients_1", false, null);

    // Update - 3.
    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.clearCQListenerEvents(clients[clientIndex],
          "testMatchingCQWithMultipleClients_3");
    }

    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);

    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);

    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.waitForUpdated(clients[clientIndex], "testMatchingCQWithMultipleClients_3",
          CqQueryDUnitTest.KEY + size);
    }

    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
        /* resultSize: */ CqQueryDUnitTest.noTest, /* creates: */ size, /* updates: 2 */ size * 2,
        /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ size * 2,
        /* queryDeletes: */ 0, /* totalEvents: */ size * 3);

    // Create different kind of CQs.
    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4",
          cqDUnitTest.cqs[1]);
      cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4", false,
          null);

      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5",
          cqDUnitTest.cqs[1]);
      cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5", false,
          null);

      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6",
          cqDUnitTest.cqs[2]);
      cqDUnitTest.executeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6", false,
          null);
    }

    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2 * clients.length);

    for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6");
    }

    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * clients.length);

    // update 4
    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);

    // Close.
    cqDUnitTest.closeClient(client3);
    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * (clients.length - 1));

    for (int clientIndex = 0; clientIndex < 2; clientIndex++) {
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5");
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4");
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_2");
      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_1");
      if (clientIndex != 0) {
        cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_0");
      }
    }

    validateMatchingCqs(server, 0, null, 0);

    cqDUnitTest.closeClient(client2);
    cqDUnitTest.closeClient(client1);

    cqDUnitTest.closeServer(server);
  }

  @Test
  public void testMatchingCQsWithMultipleServers() {

    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client1 = VM.getVM(2);
    VM client2 = VM.getVM(3);

    cqDUnitTest.createServer(server1);

    VM clients[] = new VM[] {client1, client2};

    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();
    // Create client.

    // Create client with redundancyLevel -1

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
    cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");

    int numCQs = 3;

    for (int i = 0; i < numCQs; i++) {
      cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);

      cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
    }

    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);

    Wait.pause(1000);

    // CREATE.
    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);

    for (int i = 1; i <= 10; i++) {
      cqDUnitTest.waitForCreated(client1, "testMatchingCQsWithMultipleServers_0",
          CqQueryDUnitTest.KEY + i);
    }

    cqDUnitTest.createServer(server2, ports[0]);

    final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
    System.out
        .println("### Port on which server1 running : " + port1 + " Server2 running : " + port2);

    // UPDATE - 1.
    for (int k = 0; k < numCQs; k++) {
      cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
      cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
    }

    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);

    // Wait for updates on regions[0]
    for (int i = 1; i <= 10; i++) {
      cqDUnitTest.waitForUpdated(client1, "testMatchingCQsWithMultipleServers_0",
          CqQueryDUnitTest.KEY + i);
      cqDUnitTest.waitForUpdated(client2, "testMatchingCQsWithMultipleServers_0",
          CqQueryDUnitTest.KEY + i);
    }

    // Wait for updates on regions[1] - Waiting for last key is good enough.
    cqDUnitTest.waitForUpdated(client1, "testMatchingCQsWithMultipleServers_2",
        CqQueryDUnitTest.KEY + 4);
    cqDUnitTest.waitForUpdated(client2, "testMatchingCQsWithMultipleServers_2",
        CqQueryDUnitTest.KEY + 4);

    int[] resultsCnt = new int[] {10, 1, 2};

    for (int i = 0; i < numCQs; i++) {
      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i,
          CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);

      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i,
          CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
    }

    cqDUnitTest.closeClient(client2);
    cqDUnitTest.closeServer(server2);
  }


  @Test
  public void testFailOverMatchingCQsWithMultipleServers() {

    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client1 = VM.getVM(2);
    VM client2 = VM.getVM(3);

    logger.info("Ready to create server 1");
    cqDUnitTest.createServer(server1);
    logger.info("Ready to create server 1");

    VM clients[] = new VM[] {client1, client2};

    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();
    // Create client.

    // Create client with redundancyLevel -1

    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
    cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");

    int numCQs = 3;

    for (int i = 0; i < numCQs; i++) {
      cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);

      cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
    }

    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);

    cqDUnitTest.createServer(server2, ports[0]);

    // Close server1.
    cqDUnitTest.closeServer(server1);

    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], clients.length);

    // Close.
    cqDUnitTest.closeClient(client1);

    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], (clients.length - 1));

    cqDUnitTest.closeClient(client2);
    cqDUnitTest.closeServer(server2);
  }


  /**
   * Test for CQ Fail over.
   */
  @Test
  public void testMatchingCQsOnDataNodeWithMultipleServers() {

    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client1 = VM.getVM(2);
    VM client2 = VM.getVM(3);

    cqDUnitTest.createServerOnly(server1, 0);
    cqDUnitTest.createServerOnly(server2, 0);
    cqDUnitTest.createPartitionRegion(server1, cqDUnitTest.regions);
    cqDUnitTest.createPartitionRegion(server2, cqDUnitTest.regions);

    VM clients[] = new VM[] {client1, client2};

    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();
    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    cqDUnitTest.createLocalRegion(client1, new int[] {port1, ports[0]}, host0, "-1",
        cqDUnitTest.regions);
    cqDUnitTest.createLocalRegion(client2, new int[] {port1, ports[0]}, host0, "-1",
        cqDUnitTest.regions);

    int numCQs = cqDUnitTest.prCqs.length;

    for (int i = 0; i < numCQs; i++) {
      cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i,
          cqDUnitTest.prCqs[i]);
      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, false, null);

      cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i,
          cqDUnitTest.prCqs[i]);
      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
    }

    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[0], clients.length);
    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[1], clients.length);

    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[0], clients.length);
    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[1], clients.length);

    // Close.
    cqDUnitTest.closeClient(client1);
    cqDUnitTest.closeClient(client2);
    cqDUnitTest.closeServer(server1);
    cqDUnitTest.closeServer(server2);
  }

  /**
   * Performance test for Matching CQ optimization changes.
   */
  @Ignore("perf")
  @Test
  public void testPerformanceForMatchingCQs() {

    VM server1 = VM.getVM(0);
    VM server2 = VM.getVM(1);
    VM client1 = VM.getVM(2);
    VM client2 = VM.getVM(3);

    cqDUnitTest.createServer(server1);
    cqDUnitTest.createServer(server2);

    // VM clients[] = new VM[]{client1, client2};

    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
    final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
    final String host0 = NetworkUtils.getServerHostName();

    // Create client.
    // final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);

    // Client1 connects to server1.
    cqDUnitTest.createClient(client1, new int[] {port1}, host0, "-1");

    // Client2 connects to server2.
    cqDUnitTest.createClient(client2, new int[] {port2}, host0, "-1");

    // Client1 registers matching CQs on server1.
    String[] matchingCqs = this.generateCqQueries(false);
    for (int i = 0; i < matchingCqs.length; i++) {
      cqDUnitTest.createCQ(client1, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
      cqDUnitTest.executeCQ(client1, "testPerformanceForMatchingCQs_" + i, false, null);
    }

    // Client2 registers non-matching CQs on server2.
    matchingCqs = this.generateCqQueries(true);
    for (int i = 0; i < matchingCqs.length; i++) {
      cqDUnitTest.createCQ(client2, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
      cqDUnitTest.executeCQ(client2, "testPerformanceForMatchingCQs_" + i, false, null);
    }

    Wait.pause(1000);

    // CREATE.
    int size = 1000;
    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size);

    // Update couple of times;
    for (int j = 0; j < 5; j++) {
      cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size - 1);
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size - 1);
    }

    for (int j = 0; j < 4; j++) {
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size - 1);
      cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size - 1);
    }

    // Update the last key.
    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);

    for (int k = 1; k <= size; k++) {
      cqDUnitTest.waitForUpdated(client1, "testPerformanceForMatchingCQs_0",
          CqQueryDUnitTest.KEY + k);
    }

    Wait.pause(1000);
    printCqQueryExecutionTime(server1);
    printCqQueryExecutionTime(server2);

    // Close.
    cqDUnitTest.closeClient(client1);
    cqDUnitTest.closeClient(client2);
    cqDUnitTest.closeServer(server2);
    cqDUnitTest.closeServer(server1);

  }

  private void validateMatchingCqs(VM server, final int mapSize, final String query,
      final int numCqSize) {
    server.invoke(() -> {
      CqServiceImpl cqService =
          (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();

      Map matchedCqMap = cqService.getMatchingCqMap();
      await()
          .until(matchedCqMap::size, equalTo(mapSize));

      if (query != null) {
        assertThat(matchedCqMap.containsKey(query)).isTrue();

        Collection cqs = (Collection) matchedCqMap.get(query);
        await()
            .until(cqs::size, equalTo(numCqSize));
      }
    });
  }

  private void printCqQueryExecutionTime(VM server) {
    server.invoke(() -> {
      CqServiceImpl cqService =
          (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();

      long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
      logger.info("Total Time taken to Execute CQ Query :" + timeTaken);
    });
  }

  private String[] generateCqQueries(boolean uniqueQueries) {
    List<String> initQueries = new ArrayList<>();
    // From Portfolio object.
    String[] names = {"aaa", "bbb", "ccc", "ddd"};
    int nameIndex;

    // Construct few unique Queries.
    for (int i = 0; i < 3; i++) {
      for (int cnt = 0; cnt < 5; cnt++) {
        String query = cqDUnitTest.cqs[i];
        if (cnt > 0) {
          nameIndex = (cnt % names.length);
          query += " or p.names[" + nameIndex + "] = '" + names[nameIndex] + cnt + "'";
        }
        initQueries.add(query);
      }
    }

    int numMatchedQueries = 10;
    List<String> cqQueries = new ArrayList<>();
    for (String query : initQueries) {
      for (int cnt = 0; cnt < numMatchedQueries; cnt++) {
        if (uniqueQueries) {
          // Append blank string, so that query string is different but the
          // Query constraint remains same.
          query += " ";
        }
        cqQueries.add(query);
      }
    }
    String[] queries = new String[cqQueries.size()];
    cqQueries.toArray(queries);
    return queries;
  }
}
