/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.dunit;

import static org.apache.geode.internal.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Map;
import java.util.Properties;

import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CacheUtils;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.data.PortfolioPdx;
import org.apache.geode.cache.query.data.PositionPdx;
import org.apache.geode.cache.query.internal.Undefined;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.FieldType;
import org.apache.geode.pdx.JSONFormatter;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.internal.ClientTypeRegistration;
import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
import org.apache.geode.pdx.internal.TypeRegistration;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.OQLQueryTest;

@Category({OQLQueryTest.class})
public class PdxQueryDUnitTest extends PDXQueryTestBase {
  public static final Logger logger = LogService.getLogger();

  public PdxQueryDUnitTest() {
    super();
  }

  /**
   * Tests client-server query on PdxInstance. The client receives projected value.
   */
  @Test
  public void testServerQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    final int numberOfEntries = 5;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });


    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        assertEquals(0, TestObject.numInstance);

        // Execute query with different type of Results.
        QueryService qs = getCache().getQueryService();
        Query query = null;
        SelectResults sr = null;
        for (int i = 0; i < queryString.length; i++) {
          try {
            query = qs.newQuery(queryString[i]);
            sr = (SelectResults) query.execute();
          } catch (Exception ex) {
            fail("Failed to execute query, " + ex.getMessage());
          }

          for (Object o : sr.asSet()) {
            if (i == 0 && !(o instanceof Integer)) {
              fail("Expected type Integer, not found in result set. Found type :" + o.getClass());
            } else if (i == 1 && !(o instanceof TestObject)) {
              fail(
                  "Expected type TestObject, not found in result set. Found type :" + o.getClass());
            } else if (i == 2 && !(o instanceof String)) {
              fail("Expected type String, not found in result set. Found type :" + o.getClass());
            }
          }
        }
        // Pdx objects for local queries now get deserialized when results are iterated.
        // So the deserialized objects are no longer cached in VMCachedDeserializable.
        assertEquals(numberOfEntries * 2, TestObject.numInstance);
      }
    });

    this.closeClient(vm1);
    this.closeClient(vm0);
  }


  /**
   * Tests client-server query on PdxInstance. The client receives projected value.
   */
  @Test
  public void testClientServerQueryWithProjections() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        class PdxObject extends TestObject {
          PdxObject() {}

          PdxObject(int id, String ticker) {
            super(id, ticker);
          }
        };
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new PdxObject(i, "vmware"));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        System.out.println("##### Region size is: " + region.size());
        assertEquals(0, TestObject.numInstance);
      }
    });

    // Create client region
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueriesWithParamsPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        SelectResults results = null;
        QueryService qService = null;
        try {
          qService = (PoolManager.find(poolName)).getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        try {
          logger.info("### Executing Query :" + queryString[0]);
          Query query = qService.newQuery(queryString[0]);
          results = (SelectResults) query.execute();
        } catch (Exception e) {
          Assert.fail("Failed executing " + queryString[0], e);
        }
        assertEquals(numberOfEntries, results.size());
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on compressed PdxInstance. The client receives uncompressed value.
   */
  @Test
  public void testClientServerQueryWithCompression() throws CacheException {
    final String randomString =
        "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
            + "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
            + "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4"
            + "asddfjkhaskkfdjhzjc0943509328kvnhfjkldsg09q3485ibjafdp9q8y43p9u7hgavpiuaha48uy9afliasdnuaiuqa498qa4";

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, false, false, compressor);
        Region region = getRootRegion().getSubregion(regionName);
        assert (region.getAttributes().getCompressor() != null);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, randomString));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        System.out.println("##### Region size is: " + region.size());
        assertEquals(0, TestObject.numInstance);
      }
    });

    // Create client region
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueriesWithParamsPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      @SuppressWarnings("unchecked")
      public void run2() throws CacheException {
        SelectResults<String> results = null;
        QueryService qService = null;
        try {
          qService = (PoolManager.find(poolName)).getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        try {
          logger.info("### Executing Query :" + queryString[2]);
          Query query = qService.newQuery(queryString[2]);
          results = (SelectResults<String>) query.execute();
        } catch (Exception e) {
          Assert.fail("Failed executing " + queryString[2], e);
        }
        assertEquals(numberOfEntries, results.size());
        for (String result : results) {
          assertEquals(randomString, result);
        }
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance. The client receives projected value.
   */
  @Test
  public void testVersionedClass() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        try {
          for (int i = 0; i < numberOfEntries; i++) {
            PdxInstanceFactory pdxFactory =
                PdxInstanceFactoryImpl.newCreator("PdxTestObject", false, getCache());
            pdxFactory.writeInt("id", i);
            pdxFactory.writeString("ticker", "vmware");
            pdxFactory.writeString("idTickers", i + "vmware");
            PdxInstance pdxInstance = pdxFactory.create();
            region.put("key-" + i, pdxInstance);
          }
        } catch (Exception ex) {
          Assert.fail("Failed to load the class.", ex);
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        System.out.println("##### Region size is: " + region.size());
        assertEquals(0, TestObject.numInstance);

      }
    });

    // Create client region
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueriesWithParamsPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        SelectResults results = null;
        QueryService qService = null;
        try {
          qService = (PoolManager.find(poolName)).getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        try {
          logger.info("### Executing Query :" + queryString[0]);
          Query query = qService.newQuery(queryString[0]);
          results = (SelectResults) query.execute();
        } catch (Exception e) {
          Assert.fail("Failed executing " + queryString[0], e);
        }

        assertEquals(numberOfEntries, results.size());
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance.
   */
  @Test
  public void testClientServerQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        QueryService localQueryService = null;

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }

        // Execute query locally.
        try {
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        for (int i = 0; i < 3; i++) {
          try {
            Query query = localQueryService.newQuery(queryString[i]);
            SelectResults results = (SelectResults) query.execute();
            assertEquals(numberOfEntries, results.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 1; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = remoteQueryService.newQuery(queryString[i]);
            rs[0][0] = (SelectResults) query.execute();
            assertEquals(numberOfEntries, rs[0][0].size());

            logger.info("### Executing Query locally:" + queryString[i]);
            query = localQueryService.newQuery(queryString[i]);
            rs[0][1] = (SelectResults) query.execute();
            assertEquals(numberOfEntries, rs[0][1].size());

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }

        }
        assertEquals(2 * numberOfEntries, TestObject.numInstance);
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance.
   */
  @Test
  public void testClientServerQueryWithRangeIndex() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);

    final String[] qs = new String[] {"SELECT * FROM " + regName + " p WHERE p.ID > 0",
        "SELECT p FROM " + regName + " p WHERE p.ID > 0",
        "SELECT * FROM " + regName + " p WHERE p.ID = 1",
        "SELECT * FROM " + regName + " p WHERE p.ID < 10",
        "SELECT * FROM " + regName + " p WHERE p.ID != 10",
        "SELECT * FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
        "SELECT * FROM " + regName + " p, p.positions.values pos WHERE p.ID = 10",
        "SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
        "SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE p.ID = 10",
        "SELECT pos FROM " + regName + " p, p.positions.values pos WHERE p.ID > 0",
        "SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId != 'XXX'",
        "SELECT pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId != 'XXX'",
        "SELECT pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
        "SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
        "SELECT p, pos FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'DELL'",
        "SELECT * FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'SUN'",
        "SELECT * FROM " + regName + " p, p.positions.values pos WHERE pos.secId = 'DELL'",
        "SELECT p, p.position1 FROM " + regName + " p where p.position1.secId != 'XXX'",
        "SELECT p, p.position1 FROM " + regName + " p where p.position1.secId = 'SUN'",
        "SELECT p.position1 FROM " + regName + " p WHERE p.ID > 0",
        "SELECT * FROM " + regName + " p WHERE p.status = 'active'",
        "SELECT p FROM " + regName + " p WHERE p.status != 'active'",};

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, false, true, null); // Async index
        Region region = getRootRegion().getSubregion(regionName);
        // Create Range index.
        QueryService qs = getCache().getQueryService();
        try {
          qs.createIndex("idIndex", "p.ID", regName + " p");
          qs.createIndex("statusIndex", "p.status", regName + " p");
          qs.createIndex("secIdIndex", "pos.secId", regName + " p, p.positions.values pos");
          qs.createIndex("pSecIdIdIndex", "p.position1.secId", regName + " p");
        } catch (Exception ex) {
          fail("Failed to create index." + ex.getMessage());
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, false, true, null); // Async index
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        int j = 0;
        for (int i = 0; i < 100; i++) {
          region.put("key-" + i, new PortfolioPdx(j, j++));
          // To add duplicate:
          if (i % 24 == 0) {
            j = 0; // reset
          }
        }
      }
    });


    // Execute query and make sure there is no PdxInstance in the results.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        // Execute query locally.
        QueryService queryService = getCache().getQueryService();
        for (int i = 0; i < qs.length; i++) {
          try {
            Query query = queryService.newQuery(qs[i]);
            SelectResults results = (SelectResults) query.execute();
            for (Object o : results.asList()) {
              if (o instanceof Struct) {
                Object[] values = ((Struct) o).getFieldValues();
                for (int c = 0; c < values.length; c++) {
                  if (values[c] instanceof PdxInstance) {
                    fail("Found unexpected PdxInstance in the query results. At struct field [" + c
                        + "] query :" + qs[i] + " Object is: " + values[c]);
                  }
                }
              } else {
                if (o instanceof PdxInstance) {
                  fail("Found unexpected PdxInstance in the query results. " + qs[i]);
                }
              }
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }

        }
      }
    });

    // Re-execute query to fetch PdxInstance in the results.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        // Execute query locally.
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);
        try {
          QueryService queryService = getCache().getQueryService();
          for (int i = 0; i < qs.length; i++) {
            try {
              Query query = queryService.newQuery(qs[i]);
              SelectResults results = (SelectResults) query.execute();
              for (Object o : results.asList()) {
                if (o instanceof Struct) {
                  Object[] values = ((Struct) o).getFieldValues();
                  for (int c = 0; c < values.length; c++) {
                    if (!(values[c] instanceof PdxInstance)) {
                      fail(
                          "Didn't found expected PdxInstance in the query results. At struct field ["
                              + c + "] query :" + qs[i] + " Object is: " + values[c]);
                    }
                  }
                } else {
                  if (!(o instanceof PdxInstance)) {
                    fail("Didn't found expected PdxInstance in the query results. " + qs[i]
                        + " Object is: " + o);
                  }
                }
              }
            } catch (Exception e) {
              Assert.fail("Failed executing " + qs[i], e);
            }
          }
        } finally {
          cache.setReadSerializedForTest(false);
        }
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance.
   */
  @Test
  public void testClientServerCountQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String queryStr = "SELECT COUNT(*) FROM " + regName + " WHERE id >= 0";

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        QueryService localQueryService = null;

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }

        // Execute query locally.
        try {
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        try {
          Query query = localQueryService.newQuery(queryStr);
          SelectResults results = (SelectResults) query.execute();
          assertEquals(numberOfEntries, ((Integer) results.asList().get(0)).intValue());
        } catch (Exception e) {
          Assert.fail("Failed executing " + queryStr, e);
        }

      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        try {
          logger.info("### Executing Query on server:" + queryStr);
          Query query = remoteQueryService.newQuery(queryStr);
          rs[0][0] = (SelectResults) query.execute();
          assertEquals(numberOfEntries, ((Integer) rs[0][0].asList().get(0)).intValue());

          logger.info("### Executing Query locally:" + queryStr);
          query = localQueryService.newQuery(queryStr);
          rs[0][1] = (SelectResults) query.execute();
          assertEquals(numberOfEntries, ((Integer) rs[0][1].asList().get(0)).intValue());

          // Compare local and remote query results.
          if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
            fail("Local and Remote Query Results are not matching for query :" + queryStr);
          }

        } catch (Exception e) {
          Assert.fail("Failed executing " + queryStr, e);
        }

        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance.
   */
  @Test
  public void testVersionedClientServerQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    final String[] queryStr = new String[] {"SELECT DISTINCT ID FROM " + regName, // 0
        "SELECT * FROM " + regName, // 1
        "SELECT pkid FROM " + regName, // 2
        "SELECT * FROM " + regName + " WHERE ID > 5", // 3
        "SELECT p FROM " + regName + " p, p.positions pos WHERE p.pkid != 'vmware'", // 4
        "SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.value.ID > 0",
        "SELECT entry.value FROM  " + this.regName + ".entries entry WHERE entry.key = 'key-1'",
        "SELECT e.value FROM " + this.regName + ".entrySet e where  e.value.pkid >= '0'",
        "SELECT * FROM " + this.regName + ".values p WHERE p.pkid in SET('1', '2','3')",
        "SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
        "SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
        "SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
        "SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
            + "positions WHERE positions.mktValue >= 25.00",
        "SELECT * FROM " + this.regName + " portfolio1, " + this.regName + " portfolio2 WHERE "
            + "portfolio1.status = portfolio2.status",
        "SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
            + this.regName + " portfolio2  WHERE portfolio1.status = portfolio2.status",
        "SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
            + this.regName + " portfolio2,  portfolio2.positions.values positions2 WHERE "
            + "positions1.secId = positions1.secId ",
        "SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
            + "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
        "SELECT DISTINCT pf1, pf2 FROM " + this.regName
            + "  pf1, pf1.collectionHolderMap.values coll1," + " pf1.positions.values posit1, "
            + this.regName + "  pf2, pf2.collectionHolderMap.values "
            + " coll2, pf2.positions.values posit2 WHERE pf1.ID = pf2.ID",};



    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        // Load client/server region.
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());

        try {

          // Load TestObject
          for (int i = 0; i < numberOfEntries; i++) {
            PortfolioPdxVersion portfolioPdxVersion =
                new PortfolioPdxVersion(new Integer(i), new Integer(i));
            PdxInstanceFactory pdxFactory =
                PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
            PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxFactory);
            region.put("key-" + i, pdxInstance);
          }
        } catch (Exception ex) {
          fail("Failed to load the class.");
        }

        // Execute query:
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];

        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < queryStr.length; i++) {
          try {
            logger.info("### Executing Query on server:" + queryStr[i]);
            Query query = remoteQueryService.newQuery(queryStr[i]);
            rs[0][0] = (SelectResults) query.execute();
            logger.info("### Executing Query locally:" + queryStr[i]);
            query = localQueryService.newQuery(queryStr[i]);
            rs[0][1] = (SelectResults) query.execute();
            logger.info("### Remote Query rs size: " + (rs[0][0]).size() + "Local Query rs size: "
                + (rs[0][1]).size());
            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queryStr[i]);
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + queryStr[i], e);
          }

        }

      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance with mixed types.
   */
  @Test
  public void testClientServerQueryMixedTypes() throws CacheException {

    final String[] testQueries = new String[] {"select ticker from /root/" + regionName,
        "select ticker from /root/" + regionName + " p where IS_DEFINED(p.ticker)",
        "select ticker from /root/" + regionName + " where ticker = 'vmware'",};
    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
        for (int i = numberOfEntries; i < (numberOfEntries + 10); i++) {
          region.put("key-" + i, new TestObject2(i));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        QueryService localQueryService = null;

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
        for (int i = numberOfEntries; i < (numberOfEntries + 10); i++) {
          region.put("key-" + i, new TestObject2(i));
        }
        // Execute query locally.
        try {
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        for (int i = 0; i < 3; i++) {
          try {
            Query query = localQueryService.newQuery(testQueries[i]);
            SelectResults results = (SelectResults) query.execute();
            if (i == 0) {
              assertEquals(numberOfEntries + 10, results.size());
            } else {
              assertEquals(numberOfEntries, results.size());
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + testQueries[i], e);
          }
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + testQueries[i]);
            Query query = remoteQueryService.newQuery(testQueries[i]);
            rs[0][0] = (SelectResults) query.execute();
            if (i == 0) {
              // defined and undefined values returned
              assertEquals(numberOfEntries + 10, rs[0][0].size());
            } else {
              // only defined values of ticker
              assertEquals(numberOfEntries, rs[0][0].size());
            }
            logger.info("### Executing Query locally:" + testQueries[i]);
            query = localQueryService.newQuery(testQueries[i]);
            rs[0][1] = (SelectResults) query.execute();
            if (i == 0) {
              assertEquals(numberOfEntries + 10, rs[0][1].size());
            } else {
              assertEquals(numberOfEntries, rs[0][1].size());
            }
            assertEquals(rs[0][0].size(), rs[0][1].size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + testQueries[i], e);
          }

        }
        assertEquals(2 * (numberOfEntries + 5), (TestObject.numInstance + TestObject2.numInstance));
      }
    };

    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests query on with PR.
   */
  @Test
  public void testQueryOnPR() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 100;

    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, true);
      }
    });

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    // Load region.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });

    // Client pool.
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Execute client queries
    vm3.invoke(new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;

        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 1; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = remoteQueryService.newQuery(queryString[i]);
            SelectResults rs = (SelectResults) query.execute();
            assertEquals(numberOfEntries, rs.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }
        }
        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    });

    // Check for TestObject instances.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    });

    // Check for TestObject instances.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    // Check for TestObject instances.
    // It should be 0
    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    // Execute Query on Server2.
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        QueryService qs = getCache().getQueryService();
        Query query = null;
        SelectResults sr = null;
        for (int i = 0; i < queryString.length; i++) {
          try {
            query = qs.newQuery(queryString[i]);
            sr = (SelectResults) query.execute();
          } catch (Exception ex) {
            fail("Failed to execute query, " + ex.getMessage());
          }

          for (Object o : sr.asSet()) {
            if (i == 0 && !(o instanceof Integer)) {
              fail("Expected type Integer, not found in result set. Found type :" + o.getClass());
            } else if (i == 1 && !(o instanceof TestObject)) {
              fail(
                  "Expected type TestObject, not found in result set. Found type :" + o.getClass());
            } else if (i == 2 && !(o instanceof String)) {
              fail("Expected type String, not found in result set. Found type :" + o.getClass());
            }
          }
        }
        if (TestObject.numInstance <= 0) {
          fail("Expected TestObject instance to be >= 0.");
        }
      }
    });

    // Check for TestObject instances.
    // It should be 0
    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });


    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests query on with PR.
   */
  @Test
  public void testLocalPRQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 100;

    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, true);
      }
    });

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    vm3.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });


    // Load region using class loader and execute query on the same thread.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        try {
          // Load TestObject
          for (int i = 0; i < numberOfEntries; i++) {
            PdxInstanceFactory pdxInstanceFactory =
                PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
            PortfolioPdxVersion portfolioPdxVersion =
                new PortfolioPdxVersion(new Integer(i), new Integer(i));
            PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxInstanceFactory);
            region.put("key-" + i, pdxInstance);
          }
        } catch (Exception ex) {
          Assert.fail("Failed to load the class.", ex);
        }

        QueryService localQueryService = null;

        try {
          localQueryService = region.getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 1; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = localQueryService.newQuery(queryString[i]);
            SelectResults rs = (SelectResults) query.execute();
            assertEquals(numberOfEntries, rs.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }
        }
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests query on with PR.
   */
  @Test
  public void testPdxReadSerializedForPRQuery() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 100;

    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, true);
      }
    });

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });

    vm3.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
      }
    });


    // Load region using class loader and execute query on the same thread.
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        try {
          // Load TestObject
          for (int i = 0; i < numberOfEntries; i++) {
            PortfolioPdxVersion portfolioPdxVersion =
                new PortfolioPdxVersion(new Integer(i), new Integer(i));
            PdxInstanceFactory pdxInstanceFactory =
                PdxInstanceFactoryImpl.newCreator("PortfolioPdxVersion", false, getCache());
            PdxInstance pdxInstance = portfolioPdxVersion.createPdxInstance(pdxInstanceFactory);
            region.put("key-" + i, pdxInstance);
          }
        } catch (Exception ex) {
          fail("Failed to load the class.");
        }

        QueryService localQueryService = null;

        try {
          localQueryService = region.getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 1; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = localQueryService.newQuery(queryString[i]);
            SelectResults rs = (SelectResults) query.execute();
            assertEquals(numberOfEntries, rs.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }
        }
      }
    });

    final String[] qs = new String[] {"SELECT * FROM " + regName,
        "SELECT * FROM " + regName + " WHERE ID > 5", "SELECT p FROM " + regName
            + " p, p.positions.values pos WHERE p.ID > 2 or pos.secId = 'vmware'",};

    // Execute query on node without class and with pdxReadSerialized.
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        GemFireCacheImpl c = (GemFireCacheImpl) region.getCache();
        try {
          // Set read serialized.
          c.setReadSerializedForTest(true);

          QueryService localQueryService = null;

          try {
            localQueryService = region.getCache().getQueryService();
          } catch (Exception e) {
            Assert.fail("Failed to get QueryService.", e);
          }

          // This should not throw class not found exception.
          for (int i = 1; i < qs.length; i++) {
            try {
              logger.info("### Executing Query on server:" + qs[i]);
              Query query = localQueryService.newQuery(qs[i]);
              SelectResults rs = (SelectResults) query.execute();
              for (Object o : rs.asSet()) {
                if (!(o instanceof PdxInstance)) {
                  fail("Expected type PdxInstance, not found in result set. Found type :"
                      + o.getClass());
                }
              }
            } catch (Exception e) {
              Assert.fail("Failed executing " + qs[i], e);
            }
          }
        } finally {
          c.setReadSerializedForTest(false);
        }
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  @Test
  public void testPdxReadSerializedForPRSelectAllQuery() throws CacheException {
    VM vm0 = VM.getVM(0);
    VM vm1 = VM.getVM(1);

    final int numPuts = 10;

    vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);
      }
    });

    vm1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(true, false);
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);
      }
    });

    vm0.invoke(() -> {
      Region region = getRootRegion().getSubregion(regionName);
      for (int i = 0; i < numPuts; i++) {
        region.put("key-" + i, new TestObject(i, "val-" + i));
      }
    });

    vm0.invoke(() -> {
      QueryService qs = getCache().getQueryService();
      Query query = qs.newQuery(queryString[1]);
      SelectResults sr = (SelectResults) query.execute();
      assertThat(sr.size()).isEqualTo(numPuts);
      for (Object result : sr) {
        assertThat(result).isInstanceOf(PdxInstance.class);
      }
    });
  }

  /**
   * Tests index on PdxInstance.
   */
  @Test
  public void testIndex() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          if (i % 2 == 0) {
            region.put("key-" + i, new TestObject(i, "vmware"));
          } else {
            region.put("key-" + i, new TestObject(i, "vmware" + i));
          }
        }

        try {
          QueryService qs = getCache().getQueryService();
          qs.createIndex("idIndex", IndexType.FUNCTIONAL, "id", regName);
          qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + " p");
          qs.createIndex("tickerIdTickerMapIndex", IndexType.FUNCTIONAL, "p.ticker",
              regName + " p, p.idTickers idTickers");
        } catch (Exception ex) {
          fail("Unable to create index. " + ex.getMessage());
        }
        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);

        try {
          QueryService qs = getCache().getQueryService();
          qs.createIndex("idIndex", IndexType.FUNCTIONAL, "id", regName);
          qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + " p");
          qs.createIndex("tickerIdTickerMapIndex", IndexType.FUNCTIONAL, "p.ticker",
              regName + " p, p.idTickers idTickers");
        } catch (Exception ex) {
          fail("Unable to create index. " + ex.getMessage());
        }
        assertEquals(0, TestObject.numInstance);
      }
    });

    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    SerializableRunnable createClientRegions = new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        for (int i = 0; i < numberOfEntries * 2; i++) {
          if (i % 2 == 0) {
            region.put("key-" + i, new TestObject(i, "vmware"));
          } else {
            region.put("key-" + i, new TestObject(i, "vmware" + i));
          }
        }
      }
    };

    vm2.invoke(createClientRegions);
    vm3.invoke(createClientRegions);

    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    });

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });


    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < 3; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = remoteQueryService.newQuery(queryString[i]);
            rs[0][0] = (SelectResults) query.execute();
            assertEquals(numberOfEntries * 2, rs[0][0].size());

            logger.info("### Executing Query locally:" + queryString[i]);
            query = localQueryService.newQuery(queryString[i]);
            rs[0][1] = (SelectResults) query.execute();
            assertEquals(numberOfEntries * 2, rs[0][1].size());

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }

        }
        assertEquals(4 * numberOfEntries, TestObject.numInstance);

        for (int i = 3; i < queryString.length; i++) {
          try {
            logger.info("### Executing Query on server:" + queryString[i]);
            Query query = remoteQueryService.newQuery(queryString[i]);
            rs[0][0] = (SelectResults) query.execute();

            logger.info("### Executing Query locally:" + queryString[i]);
            query = localQueryService.newQuery(queryString[i]);
            rs[0][1] = (SelectResults) query.execute();

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queryString[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queryString[i], e);
          }

        }

      }
    };

    vm2.invoke(executeQueries);
    vm3.invoke(executeQueries);

    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(numberOfEntries, TestObject.numInstance);
      }
    });

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query with region iterators.
   */
  @Test
  public void testRegionIterators() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;

    final String[] queries = new String[] {
        "SELECT entry.value FROM " + this.regName + ".entries entry WHERE entry.value.id > 0",
        "SELECT entry.value FROM  " + this.regName + ".entries entry WHERE entry.key = 'key-1'",
        "SELECT e.value FROM " + this.regName + ".entrySet e where  e.value.id >= 0",
        "SELECT * FROM " + this.regName + ".values p WHERE p.ticker = 'vmware'",};

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        QueryService localQueryService = null;

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < queries.length; i++) {
          try {
            logger.info("### Executing Query on server:" + queries[i]);
            Query query = remoteQueryService.newQuery(queries[i]);
            rs[0][0] = (SelectResults) query.execute();

            logger.info("### Executing Query locally:" + queries[i]);
            query = localQueryService.newQuery(queries[i]);
            rs[0][1] = (SelectResults) query.execute();

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queries[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queries[i], e);
          }

        }
      }
    };

    vm3.invoke(executeQueries);

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });


    // Create index
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        QueryService qs = getCache().getQueryService();
        try {
          qs.createIndex("idIndex", IndexType.FUNCTIONAL, "entry.value.id",
              regName + ".entries entry");
          qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "p.ticker", regName + ".values p");
        } catch (Exception ex) {
          fail("Unable to create index. " + ex.getMessage());
        }
      }
    });

    vm3.invoke(executeQueries);

    // Check for TestObject instances.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, TestObject.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query with nested and collection of Pdx.
   */
  @Test
  public void testNestedAndCollectionPdx() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 50;

    final String[] queries = new String[] {
        "SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
        "SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
        "SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
        "SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
            + "positions WHERE positions.mktValue >= 25.00",
        "SELECT * FROM " + this.regName + " portfolio1, " + this.regName2 + " portfolio2 WHERE "
            + "portfolio1.status = portfolio2.status",
        "SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
            + this.regName + " portfolio2  WHERE portfolio1.status = portfolio2.status",
        "SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
            + this.regName + " portfolio2,  portfolio2.positions.values positions2 WHERE "
            + "positions1.secId = positions2.secId ",
        "SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
            + "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
        "SELECT DISTINCT * FROM " + this.regName + "  pf1, pf1.collectionHolderMap.values coll1,"
            + " pf1.positions.values posit1, " + this.regName2
            + "  pf2, pf2.collectionHolderMap.values "
            + " coll2, pf2.positions.values posit2 WHERE posit1.secId='IBM' AND posit2.secId='IBM'",};

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region1 = getRootRegion().getSubregion(regionName);
        Region region2 = getRootRegion().getSubregion(regionName2);

        for (int i = 0; i < numberOfEntries; i++) {
          region1.put("key-" + i, new PortfolioPdx(i, i));
          region2.put("key-" + i, new PortfolioPdx(i, i));
        }
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
        Region region2 = getRootRegion().getSubregion(regionName2);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region1 = createRegion(regionName, rootRegionName, factory.create());
        Region region2 = createRegion(regionName2, rootRegionName, factory.create());

        for (int i = 0; i < numberOfEntries; i++) {
          region1.put("key-" + i, new PortfolioPdx(i, i));
          region2.put("key-" + i, new PortfolioPdx(i, i));
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < queries.length; i++) {
          try {
            logger.info("### Executing Query on server:" + queries[i]);
            Query query = remoteQueryService.newQuery(queries[i]);
            rs[0][0] = (SelectResults) query.execute();

            logger.info("### Executing Query locally:" + queries[i]);
            query = localQueryService.newQuery(queries[i]);
            rs[0][1] = (SelectResults) query.execute();

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queries[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queries[i], e);
          }

        }
      }
    };

    vm3.invoke(executeQueries);

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, PortfolioPdx.numInstance);
        assertEquals(0, PositionPdx.numInstance);
      }
    });


    // Create index
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        QueryService qs = getCache().getQueryService();
        try {
          qs.createIndex("pkIndex", IndexType.FUNCTIONAL, "portfolio.Pk", regName + " portfolio");
          qs.createIndex("secIdIndex", IndexType.FUNCTIONAL, "pos.secId",
              regName + " p, p.positions.values AS pos");
          qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "pf.position1.secId",
              regName + " pf");
          qs.createIndex("secIdIndexPf1", IndexType.FUNCTIONAL, "pos11.secId",
              regName + " pf1, pf1.collectionHolderMap.values coll1, pf1.positions.values pos11");
          qs.createIndex("secIdIndexPf2", IndexType.FUNCTIONAL, "pos22.secId",
              regName2 + " pf2, pf2.collectionHolderMap.values coll2, pf2.positions.values pos22");
        } catch (Exception ex) {
          fail("Unable to create index. " + ex.getMessage());
        }
      }
    });

    vm3.invoke(executeQueries);

    // index is created on portfolio.Pk field which does not exists in
    // PorfolioPdx object
    // but there is a method getPk(), so for #44436, the objects are
    // deserialized to get the value in vm1
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(numberOfEntries, PortfolioPdx.numInstance);
        assertEquals(325, PositionPdx.numInstance); // 50 PorforlioPdx objects
        // create (50*3)+50+50+50+25
        // = 325 PositionPdx objects
        // when deserialized
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query with nested and collection of Pdx.
   */
  @Test
  public void testNestedAndCollectionPdxWithPR() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 50;

    final String[] queries = new String[] {
        "SELECT * FROM " + this.regName + " pf where pf.position1.secId > '2'",
        "SELECT * FROM " + this.regName + " p where p.position3[1].portfolioId = 2",
        "SELECT * FROM " + this.regName + " p, p.positions.values AS pos WHERE pos.secId != '1'",
        "SELECT key, positions FROM " + this.regName + ".entrySet, value.positions.values "
            + "positions WHERE positions.mktValue >= 25.00",
        "SELECT * FROM " + this.regName + " portfolio1, " + this.regName2 + " portfolio2 WHERE "
            + "portfolio1.status = portfolio2.status",
        "SELECT portfolio1.ID, portfolio2.status FROM " + this.regName + " portfolio1, "
            + this.regName + " portfolio2  WHERE portfolio1.status = portfolio2.status",
        "SELECT * FROM " + this.regName + " portfolio1, portfolio1.positions.values positions1, "
            + this.regName + " portfolio2,  portfolio2.positions.values positions2 WHERE "
            + "positions1.secId = positions2.secId ",
        "SELECT * FROM " + this.regName + " portfolio, portfolio.positions.values positions WHERE "
            + "portfolio.Pk IN SET ('1', '2') AND positions.secId = '1'",
        "SELECT DISTINCT * FROM " + this.regName + "  pf1, pf1.collectionHolderMap.values coll1,"
            + " pf1.positions.values posit1, " + this.regName2
            + "  pf2, pf2.collectionHolderMap.values "
            + " coll2, pf2.positions.values posit2 WHERE posit1.secId='IBM' AND posit2.secId='IBM'",};

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, true);
        Region region1 = getRootRegion().getSubregion(regionName);
        Region region2 = getRootRegion().getSubregion(regionName2);
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, true);
        Region region = getRootRegion().getSubregion(regionName);
        Region region2 = getRootRegion().getSubregion(regionName2);
      }
    });

    // Start server2
    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer(false, true);
        Region region = getRootRegion().getSubregion(regionName);
        Region region2 = getRootRegion().getSubregion(regionName2);
      }
    });

    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        QueryService localQueryService = null;

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region1 = createRegion(regionName, rootRegionName, factory.create());
        Region region2 = createRegion(regionName2, rootRegionName, factory.create());

        for (int i = 0; i < numberOfEntries; i++) {
          region1.put("key-" + i, new PortfolioPdx(i, i));
          region2.put("key-" + i, new PortfolioPdx(i, i));
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < queries.length; i++) {
          try {
            logger.info("### Executing Query on server:" + queries[i]);
            Query query = remoteQueryService.newQuery(queries[i]);
            rs[0][0] = (SelectResults) query.execute();

            logger.info("### Executing Query locally:" + queries[i]);
            query = localQueryService.newQuery(queries[i]);
            rs[0][1] = (SelectResults) query.execute();

            // Compare local and remote query results.
            if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
              fail("Local and Remote Query Results are not matching for query :" + queries[i]);
            }

          } catch (Exception e) {
            Assert.fail("Failed executing " + queries[i], e);
          }

        }
      }
    };

    vm3.invoke(executeQueries);

    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, PortfolioPdx.numInstance);
        assertEquals(0, PositionPdx.numInstance);
      }
    });


    // Create index
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Region region = getRootRegion().getSubregion(regionName);
        QueryService qs = getCache().getQueryService();
        try {
          qs.createIndex("pkIndex", IndexType.FUNCTIONAL, "portfolio.Pk", regName + " portfolio");
          qs.createIndex("idIndex", IndexType.FUNCTIONAL, "pos.secId",
              regName + " p, p.positions.values AS pos");
          qs.createIndex("tickerIndex", IndexType.FUNCTIONAL, "pf.position1.secId",
              regName + " pf");
          qs.createIndex("secIdIndexPf1", IndexType.FUNCTIONAL, "pos11.secId",
              regName + " pf1, pf1.collectionHolderMap.values coll1, pf1.positions.values pos11");
          qs.createIndex("secIdIndexPf2", IndexType.FUNCTIONAL, "pos22.secId",
              regName2 + " pf2, pf2.collectionHolderMap.values coll2, pf2.positions.values pos22");
        } catch (Exception ex) {
          fail("Unable to create index. " + ex.getMessage());
        }
      }
    });

    vm3.invoke(executeQueries);

    // Check for TestObject instances.
    // It should be 0
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, PortfolioPdx.numInstance);
        assertEquals(0, PositionPdx.numInstance);
      }
    });

    // index is created on portfolio.Pk field which does not exists in
    // PorfolioPdx object
    // but there is a method getPk(), so for #44436, the objects are
    // deserialized to get the value in vm1
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(numberOfEntries, PortfolioPdx.numInstance);
        // 50 PorforlioPdx objects create (50*3)+50+50+50+25 = 325 PositionPdx
        // objects when deserialized
        assertEquals(325, PositionPdx.numInstance);
      }
    });

    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, PortfolioPdx.numInstance);
        assertEquals(0, PositionPdx.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }



  /**
   * Tests identity of Pdx.
   */
  @Test
  public void testPdxIdentity() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String queryStr =
        "SELECT DISTINCT * FROM " + this.regName + " pf where pf.ID > 2 and pf.ID < 10";
    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    final int dupIndex = 2;

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        int j = 0;
        for (int i = 0; i < numberOfEntries * 2; i++) {
          // insert duplicate values.
          if (i % dupIndex == 0) {
            j++;
          }
          region.put("key-" + i, new PortfolioPdx(j));
        }
      }
    });

    // Execute client queries
    SerializableRunnable executeQueries = new CacheSerializableRunnable("Execute queries") {
      @Override
      public void run2() throws CacheException {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] rs = new SelectResults[1][2];
        try {
          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
          localQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        int expectedResultSize = 7;

        try {
          logger.info("### Executing Query on server:" + queryStr);
          Query query = remoteQueryService.newQuery(queryStr);
          rs[0][0] = (SelectResults) query.execute();
          assertEquals(expectedResultSize, rs[0][0].size());

          logger.info("### Executing Query locally:" + queryStr);
          query = localQueryService.newQuery(queryStr);
          rs[0][1] = (SelectResults) query.execute();
          assertEquals(expectedResultSize, rs[0][1].size());
          logger.info("### Remote Query rs size: " + (rs[0][0]).size() + "Local Query rs size: "
              + (rs[0][1]).size());

          // Compare local and remote query results.
          if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
            fail("Local and Remote Query Results are not matching for query :" + queryStr);
          }
        } catch (Exception e) {
          Assert.fail("Failed executing " + queryStr, e);
        }
      }
    };

    // vm2.invoke(executeQueries);
    vm3.invoke(executeQueries);

    // Check for TestObject instances on Server2.
    // It should be 0
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        assertEquals(0, PortfolioPdx.numInstance);
      }
    });

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests function calls in the query.
   */
  @Test
  public void testFunctionCalls() throws CacheException {
    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String[] queryStr =
        new String[] {"SELECT * FROM " + this.regName + " pf where pf.getIdValue() > 0", // 0
            "SELECT * FROM " + this.regName + " pf where pf.test.getId() > 0", // 1
            "SELECT * FROM " + this.regName + " pf, pf.positions.values pos where "
                + "pos.getSecId() != 'VMWARE'", // 2
            "SELECT * FROM " + this.regName + " pf, pf.positions.values pos where "
                + "pf.getIdValue() > 0 and pos.getSecId() != 'VMWARE'", // 3
            "SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
                + "pos.getSecId() != 'VMWARE'", // 4
            "SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
                + "pf.id > 0 and pos.getSecId() != 'IBM'", // 5
            "SELECT * FROM " + this.regName + " pf, pf.getPositions('test').values pos where "
                + "pf.getIdValue() > 0 and pos.secId != 'IBM'", // 6
        };

    final int numPositionsPerTestObject = 2;

    final int[] numberOfTestObjectForAllQueries = new int[] {numberOfEntries, // Query 0
        0, // Query 1
        0, // Query 2
        numberOfEntries, // Query 3
        numberOfEntries, // Query 4
        numberOfEntries, // Query 5
        numberOfEntries, // Query 6
    };

    final int[] numberOfTestObject2ForAllQueries = new int[] {numberOfEntries, // Query 0
        numberOfEntries, // Query 1
        0, // Query 2
        numberOfEntries, // Query 3
        numberOfEntries, // Query 4
        numberOfEntries, // Query 5
        numberOfEntries, // Query 6
    };

    final int[] numberOfPositionPdxForAllQueries =
        new int[] {(numberOfEntries * numPositionsPerTestObject), // Query 0
            0, // Query 1
            (numberOfEntries * numPositionsPerTestObject), // 2
            (numberOfEntries * numPositionsPerTestObject * 2), // 3
            (numberOfEntries * numPositionsPerTestObject), // 4
            (numberOfEntries * numPositionsPerTestObject), // 5
            (numberOfEntries * numPositionsPerTestObject), // 6
        };


    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });

    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getRootRegion().getSubregion(regionName);
      }
    });


    // Client pool.
    final int port0 = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final int port1 = vm1.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());

    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());

    // Create client pool.
    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port0}, true);
    createPool(vm3, poolName, new String[] {host0}, new int[] {port1}, true);

    final int dupIndex = 2;

    // Create client region
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {

        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.LOCAL);
        ClientServerTestCase.configureConnectionPool(factory, host0, port1, -1, true, -1, -1, null);
        Region region = createRegion(regionName, rootRegionName, factory.create());
        int j = 0;
        for (int i = 0; i < numberOfEntries; i++) {
          // insert duplicate values.
          if (i % dupIndex == 0) {
            j++;
          }
          region.put("key-" + i, new TestObject(j, "vmware", numPositionsPerTestObject));
        }
      }
    });


    for (int i = 0; i < queryStr.length; i++) {
      final int testObjectCnt = numberOfTestObjectForAllQueries[i];
      final int positionObjectCnt = numberOfPositionPdxForAllQueries[i];
      final int testObjCnt = numberOfTestObject2ForAllQueries[i];

      executeClientQueries(vm3, poolName, queryStr[i]);
      // Check for TestObject instances on Server2.

      vm1.invoke(new CacheSerializableRunnable("validate") {
        @Override
        public void run2() throws CacheException {
          assertEquals(testObjectCnt, TestObject.numInstance);
          assertEquals(positionObjectCnt, PositionPdx.numInstance);
          assertEquals(testObjCnt, TestObject2.numInstance);

          // Reset the instances
          TestObject.numInstance = 0;
          PositionPdx.numInstance = 0;
          TestObject2.numInstance = 0;
        }
      });
    }

    this.closeClient(vm2);
    this.closeClient(vm3);
    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * This test creates 3 cache servers with a PR and one client which puts some PDX values in PR and
   * runs a query. This was failing randomly in a POC.
   */
  @Test
  public void testPutAllWithIndexes() {
    final String name = "testRegion";
    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);
    VM vm2 = host.getVM(2);
    VM vm3 = host.getVM(3);

    final Properties config = new Properties();
    config.setProperty("locators", "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");

    // Start server
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Cache cache = new CacheFactory(config).create();
        AttributesFactory factory = new AttributesFactory();
        PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
        prfactory.setRedundantCopies(0);
        factory.setPartitionAttributes(prfactory.create());
        cache.createRegionFactory(factory.create()).create(name);
        try {
          startCacheServer(0, false);
        } catch (Exception ex) {
          Assert.fail("While starting CacheServer", ex);
        }
        // Create Index on empty region
        try {
          cache.getQueryService().createIndex("myFuncIndex", "intId", "/" + name);
        } catch (Exception e) {
          Assert.fail("index creation failed", e);
        }
      }
    });

    // Start server
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Cache cache = new CacheFactory(config).create();
        AttributesFactory factory = new AttributesFactory();
        PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
        prfactory.setRedundantCopies(0);
        factory.setPartitionAttributes(prfactory.create());
        cache.createRegionFactory(factory.create()).create(name);
        try {
          startCacheServer(0, false);
        } catch (Exception ex) {
          Assert.fail("While starting CacheServer", ex);
        }
      }
    });

    // Start server
    vm2.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        Cache cache = new CacheFactory(config).create();
        AttributesFactory factory = new AttributesFactory();
        PartitionAttributesFactory prfactory = new PartitionAttributesFactory();
        prfactory.setRedundantCopies(0);
        factory.setPartitionAttributes(prfactory.create());
        cache.createRegionFactory(factory.create()).create(name);
        try {
          startCacheServer(0, false);
        } catch (Exception ex) {
          Assert.fail("While starting CacheServer", ex);
        }
      }
    });

    // Create client region
    final int port = vm0.invoke(() -> PdxQueryDUnitTest.getCacheServerPort());
    final String host0 = NetworkUtils.getServerHostName(vm2.getHost());
    vm3.invoke(new CacheSerializableRunnable("Create region") {
      @Override
      public void run2() throws CacheException {
        Properties config = new Properties();
        config.setProperty("mcast-port", "0");
        ClientCache cache = new ClientCacheFactory(config).addPoolServer(host0, port)
            .setPoolPRSingleHopEnabled(true).setPoolSubscriptionEnabled(true).create();
        AttributesFactory factory = new AttributesFactory();
        cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
      }
    });

    vm3.invoke(new CacheSerializableRunnable("putAll() test") {

      @Override
      public void run2() throws CacheException {
        try {
          ClientCache cache = new ClientCacheFactory().create();
          Region region = cache.getRegion(name);
          QueryService queryService = cache.getQueryService();
          String k;
          for (int x = 0; x < 285; x++) {
            k = Integer.valueOf(x).toString();
            PortfolioPdx v = new PortfolioPdx(x, x);
            region.put(k, v);
          }
          Query q = queryService.newQuery("SELECT DISTINCT * from /" + name + " WHERE ID = 2");
          SelectResults qResult = (SelectResults) q.execute();
          for (Object o : qResult.asList()) {
            System.out.println("o = " + o);
          }
        } catch (Exception e) {
          Assert.fail("Querying failed: ", e);
        }
      }
    });

    Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
    // }
  }

  /**
   * In PeerTypeRegistration when a PdxType is updated, a local map of class => PdxTypes is
   * populated. This map is used to search a field for a class in different versions (PdxTypes) This
   * test verifies that the map is being updated by the cacheListener
   *
   */
  @Test
  public void testLocalMapInPeerTypePdxRegistry() throws CacheException {

    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm1 = host.getVM(1);
    final VM vm2 = host.getVM(2);
    final VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String[] qs = {"select * from " + name + " where pdxStatus = 'active'",
        "select pdxStatus from " + name + " where id > 4"};

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // Start server2
    final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // client1 loads version 1 objects on server1
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);

        // Load version 1 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
          pdxInstanceFactory.writeInt("id", i);
          pdxInstanceFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          region.put("key-" + i, pdxInstance);
        }

        return null;
      }
    });

    // client 2 loads version 2 objects on server2
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
        // Load version 2 objects
        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
          pdxInstanceFactory.writeInt("id", i);
          pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          region.put("key-" + i, pdxInstance);
        }
        return null;
      }
    });

    final SerializableRunnableIF verifyTwoPdxTypesArePresent = () -> {
      TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
      assertTrue(registration instanceof PeerTypeRegistration);

      final Map<Integer, PdxType> types = registration.types();
      assertEquals(2, types.size());
      for (PdxType type : types.values()) {
        assertEquals("PdxVersionedNewPortfolio", type.getClassName());
      }
    };

    vm0.invoke(verifyTwoPdxTypesArePresent);
    vm1.invoke(verifyTwoPdxTypesArePresent);
  }

  /**
   * Test to query a field that is not present in the Pdx object but has a get method
   *
   */
  @Test
  public void testPdxInstanceWithMethodButNoField() throws CacheException {

    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm1 = host.getVM(1);
    final VM vm2 = host.getVM(2);
    final VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String[] qs = {"select * from " + name + " where status = 'active'",
        "select status from " + name + " where id >= 5"};

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // Start server2
    final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // Start server3
    final int port3 = (Integer) vm2.invoke(new SerializableCallable("Create Server3") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // create client
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
        cf.addPoolServer(NetworkUtils.getServerHostName(vm2.getHost()), port3);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);

        for (int i = 0; i < numberOfEntries; i++) {
          region.put("key-" + i, new TestObject(i, "vmware"));
        }
        return null;
      }
    });

    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    // create index
    vm0.invoke(new SerializableCallable("Query") {
      @Override
      public Object call() throws Exception {
        QueryService qs = null;
        try {
          qs = getCache().getQueryService();
          qs.createIndex("status", "status", name);
        } catch (Exception e) {
          Assert.fail("Exception getting query service ", e);
        }

        return null;
      }
    });

    // create client
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {

        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });
    Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
  }


  /**
   * Test to query a field that is not present in the Pdx object but is present in some other
   * version of the pdx instance
   *
   */
  @Test
  public void testPdxInstanceFieldInOtherVersion() throws CacheException {

    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm1 = host.getVM(1);
    final VM vm2 = host.getVM(2);
    final VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String[] qs = {"select pdxStatus from " + name + " where pdxStatus = 'active'",
        "select pdxStatus from " + name + " where id > 8 and id < 14"};

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // Start server2
    final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // client1 loads version 1 objects on server1
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);

        // Load version 1 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
          pdxInstanceFactory.writeInt("id", i);
          pdxInstanceFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          region.put("key-" + i, pdxInstance);
        }

        return null;
      }
    });

    // client 2 loads version 2 objects on server2
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
        // Load version 2 objects
        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
          pdxInstanceFactory.writeInt("id", i);
          pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          region.put("key-" + i, pdxInstance);
        }
        return null;
      }
    });

    // query remotely from client 1 with version 1 in classpath
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
            if (i == 1) {
              for (Object o : sr) {
                if (o == null) {
                } else if (o instanceof String) {
                } else {
                  fail("Result should be either null or String and not " + o.getClass());
                }
              }
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    // query remotely from client 2 with version 2 in classpath
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
            if (i == 1) {
              for (Object o : sr) {
                if (o == null) {
                } else if (o instanceof String) {
                } else {
                  fail("Result should be either null or String and not " + o.getClass());
                }
              }
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    // query locally on server
    vm0.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);
        QueryService queryService = null;
        try {
          queryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) queryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
            if (i == 1) {
              for (Object o : sr) {
                if (o == null) {
                } else if (o instanceof String) {
                } else {
                  fail("Result should be either null or String and not " + o.getClass());
                }
              }
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    // create index
    vm0.invoke(new SerializableCallable("Query") {
      @Override
      public Object call() throws Exception {
        QueryService qs = null;
        try {
          qs = getCache().getQueryService();
          qs.createIndex("status", "status", name);
        } catch (Exception e) {
          Assert.fail("Exception getting query service ", e);
        }

        return null;
      }
    });

    // query from client 1 with version 1 in classpath
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {

        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
  }

  /**
   * 2 servers(replicated) and 2 clients. client2 puts version1 and version2 objects on server1
   * client1 had registered interest to server2, hence gets the pdx objects for both versions Test
   * local query on client1 Test if client1 fetched pdxtypes from server
   *
   */
  @Test
  public void testClientForFieldInOtherVersion() throws CacheException {

    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm1 = host.getVM(1);
    final VM vm2 = host.getVM(2);
    final VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String[] qs = {"select pdxStatus from " + name + " where pdxStatus = 'active'",
        "select status from " + name + " where id > 8 and id < 14"};

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // Start server2
    final int port2 = (Integer) vm1.invoke(new SerializableCallable("Create Server2") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // client 1 registers interest for server2
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.setPoolSubscriptionEnabled(true);
        cf.addPoolServer(NetworkUtils.getServerHostName(vm1.getHost()), port2);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
        region.registerInterest("ALL_KEYS");
        return null;
      }
    });

    // client2 loads both version objects on server1
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);

        // Load version 1 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxFactory = cache.createPdxInstanceFactory("PdxPortfolio");
          pdxFactory.writeString("pdxStatus", (i % 2 == 0 ? "active" : "inactive"));
          pdxFactory.writeInt("id", i);
          PdxInstance pdxInstance = pdxFactory.create();
          region.put("key-" + i, pdxInstance);

        } ;
        // Load version 2 objects
        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
          PdxInstanceFactory pdxFactory = cache.createPdxInstanceFactory("PdxPortfolio");
          pdxFactory.writeString("status", i % 2 == 0 ? "active" : "inactive");
          pdxFactory.writeInt("id", i);
          PdxInstance pdxInstance = pdxFactory.create();
          region.put("key-" + i, pdxInstance);
        }

        return null;
      }
    });

    // query locally on client 1 which has registered interest
    vm2.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);
        QueryService localQueryService = null;
        // Execute query remotely
        try {
          localQueryService = ((ClientCache) getCache()).getLocalQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) localQueryService.newQuery(qs[i]).execute();
            assertEquals(5, sr.size());
            if (i == 1) {
              for (Object o : sr) {
                if (o == null) {
                } else if (o instanceof String) {
                } else {
                  fail("Result should be either null or String and not " + o.getClass());
                }
              }
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        // check if the types registered on server are fetched by the client
        TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
        assertTrue(registration instanceof ClientTypeRegistration);
        Map<Integer, PdxType> m = ((ClientTypeRegistration) registration).types();
        assertEquals(2, m.size());
        for (PdxType type : m.values()) {
          assertEquals("PdxPortfolio", type.getClassName());
        }
        return null;
      }
    });
    Invoke.invokeInEveryVM("Disconnecting from the Distributed system", () -> disconnectFromDS());
  }

  /**
   * Test to query a field that is not present in the Pdx object Also the implicit method is absent
   * in the class
   *
   */
  @Test
  public void testPdxInstanceNoFieldNoMethod() throws CacheException {

    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm3 = host.getVM(3);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String[] qs = {"select * from " + name + " where pdxStatus = 'active'",
        "select pdxStatus from " + name + " where id > 4"};

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);

        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // create client and load only version 1 objects with no pdxStatus field
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);

        // Load version 1 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", false, getCache());
          pdxInstanceFactory.writeInt("id", i);
          pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : "inactive"));
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          region.put("key-" + i, pdxInstance);
        }
        return null;
      }
    });

    // Version1 class loader
    vm3.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        // Load version 1 classloader
        QueryService remoteQueryService = null;
        // Execute query remotely
        try {
          remoteQueryService = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        for (int i = 0; i < qs.length; i++) {
          try {
            SelectResults sr = (SelectResults) remoteQueryService.newQuery(qs[i]).execute();
            if (i == 1) {
              assertEquals(5, sr.size());
              for (Object o : sr) {
                if (!(o instanceof Undefined)) {
                  fail("Result should be Undefined and not " + o.getClass());
                }
              }
            } else {
              assertEquals(0, sr.size());
            }
          } catch (Exception e) {
            Assert.fail("Failed executing " + qs[i], e);
          }
        }
        return null;
      }
    });

    Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
  }

  /**
   * Test query execution when default values of {@link FieldType} are used. This happens when one
   * version of Pdx object does not have a field but other version has.
   *
   */
  @Test
  public void testDefaultValuesInPdxFieldTypes() throws Exception {
    final Host host = Host.getHost(0);
    final VM vm0 = host.getVM(0);
    final VM vm1 = host.getVM(1);
    final int numberOfEntries = 10;
    final String name = "/" + regionName;
    final String query =
        "select stringField, booleanField, charField, shortField, intField, longField, floatField, doubleField from "
            + name;

    // Start server1
    final int port1 = (Integer) vm0.invoke(new SerializableCallable("Create Server1") {
      @Override
      public Object call() throws Exception {
        Region r1 = getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
        CacheServer server = getCache().addCacheServer();
        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
        server.setPort(port);
        server.start();
        return port;
      }
    });

    // client loads version1 and version2 objects on server
    vm1.invoke(new SerializableCallable("Create client") {
      @Override
      public Object call() throws Exception {
        ClientCacheFactory cf = new ClientCacheFactory();
        cf.addPoolServer(NetworkUtils.getServerHostName(vm0.getHost()), port1);
        ClientCache cache = getClientCache(cf);
        Region region =
            cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);

        // Load version 1 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
          pdxInstanceFactory.writeString("stringField", "" + i);
          pdxInstanceFactory.writeBoolean("booleanField", (i % 2 == 0 ? true : false));
          pdxInstanceFactory.writeChar("charField", ((char) i));
          pdxInstanceFactory.writeShort("shortField", new Integer(i).shortValue());
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          logger.info("Putting object: " + pdxInstance);
          region.put("key-" + i, pdxInstance);
        }

        // Load version 2 objects
        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
          pdxInstanceFactory.writeInt("intField", i);
          pdxInstanceFactory.writeLong("longField", new Integer(i + 1).longValue());
          pdxInstanceFactory.writeFloat("floatField", new Integer(i + 2).floatValue());
          pdxInstanceFactory.writeDouble("doubleField", new Integer(i + 3).doubleValue());
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          logger.info("Putting object: " + pdxInstance);
          region.put("key-" + i, pdxInstance);
        }

        return null;
      }
    });

    // query locally on server, create index, verify results with and without index
    vm0.invoke(new SerializableCallable("Create index") {
      @Override
      public Object call() throws Exception {
        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
        cache.setReadSerializedForTest(true);

        QueryService qs = null;
        SelectResults[][] sr = new SelectResults[1][2];
        // Execute query locally
        try {
          qs = getCache().getQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }

        try {
          sr[0][0] = (SelectResults) qs.newQuery(query).execute();
          assertEquals(20, sr[0][0].size());

        } catch (Exception e) {
          Assert.fail("Failed executing " + qs, e);
        }
        // create index
        try {
          qs.createIndex("stringIndex", "stringField", name);
          qs.createIndex("booleanIndex", "booleanField", name);
          qs.createIndex("shortIndex", "shortField", name);
          qs.createIndex("charIndex", "charField", name);
          qs.createIndex("intIndex", "intField", name);
          qs.createIndex("longIndex", "longField", name);
          qs.createIndex("floatIndex", "floatField", name);
          qs.createIndex("doubleIndex", "doubleField", name);
        } catch (Exception e) {
          Assert.fail("Exception creating index ", e);
        }

        // query after index creation
        try {
          sr[0][1] = (SelectResults) qs.newQuery(query).execute();
          assertEquals(20, sr[0][1].size());

        } catch (Exception e) {
          Assert.fail("Failed executing " + qs, e);
        }

        CacheUtils.compareResultsOfWithAndWithoutIndex(sr);
        return null;
      }
    });

    // Update index
    vm1.invoke(new SerializableCallable("update index") {
      @Override
      public Object call() throws Exception {

        Region region = getCache().getRegion(regionName);

        // Load version 1 objects
        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
          pdxInstanceFactory.writeString("stringField", "" + i);
          pdxInstanceFactory.writeBoolean("booleanField", (i % 2 == 0 ? true : false));
          pdxInstanceFactory.writeChar("charField", ((char) i));
          pdxInstanceFactory.writeShort("shortField", new Integer(i).shortValue());
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          logger.info("Putting object: " + pdxInstance);
          region.put("key-" + i, pdxInstance);
        }

        // Load version 2 objects
        for (int i = 0; i < numberOfEntries; i++) {
          PdxInstanceFactory pdxInstanceFactory =
              PdxInstanceFactoryImpl.newCreator("PdxVersionedFieldType", false, getCache());
          pdxInstanceFactory.writeInt("intField", i);
          pdxInstanceFactory.writeLong("longField", new Integer(i + 1).longValue());
          pdxInstanceFactory.writeFloat("floatField", new Integer(i + 2).floatValue());
          pdxInstanceFactory.writeDouble("doubleField", new Integer(i + 3).doubleValue());
          PdxInstance pdxInstance = pdxInstanceFactory.create();
          logger.info("Putting object: " + pdxInstance);
          region.put("key-" + i, pdxInstance);
        }
        return null;
      }
    });

    // query remotely from client
    vm1.invoke(new SerializableCallable("query") {
      @Override
      public Object call() throws Exception {
        QueryService remoteQueryService = null;
        QueryService localQueryService = null;
        SelectResults[][] sr = new SelectResults[1][2];
        // Execute query locally
        try {
          remoteQueryService = getCache().getQueryService();
          localQueryService = ((ClientCache) getCache()).getLocalQueryService();
        } catch (Exception e) {
          Assert.fail("Failed to get QueryService.", e);
        }
        try {
          sr[0][0] = (SelectResults) remoteQueryService.newQuery(query).execute();
          assertEquals(20, sr[0][0].size());
          sr[0][1] = (SelectResults) localQueryService.newQuery(query).execute();
          assertEquals(20, sr[0][1].size());
        } catch (Exception e) {
          fail("Failed executing query " + e);
        }

        CacheUtils.compareResultsOfWithAndWithoutIndex(sr);

        return null;
      }
    });

    Invoke.invokeInEveryVM(DistributedTestCase.class, "disconnectFromDS");
  }

  /**
   * Tests client-server query on PdxInstance. The client receives projected value.
   */
  @Test
  public void testJSONWithHeterogenousObjectsDifferingFields() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
        String obj1 =
            "{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\", \"phones\": [ { \"number\": \"212\" }, { \"number\": \"313\" } ] }, { \"Line1\": \"NJ\", \"phones\": [ { \"number\": \"412\" }, { \"number\": \"513\" } ] } ] }";
        String obj2 =
            "{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\", \"phones\": [ { \"number\": \"212\" }, { \"number\": \"313\" } ] }, { \"Line1\": \"NJ\" } ] }";
        region.put("value1", JSONFormatter.fromJSON(obj1));
        region.put("value2", JSONFormatter.fromJSON(obj2));
      }
    });


    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");

        // Execute query with different type of Results.
        QueryService qs = getCache().getQueryService();

        try {
          SelectResults results = (SelectResults) qs
              .newQuery(
                  "select r from /testJson r, r.Address a, a.phones pn where pn.number = '412'")
              .execute();
          assertEquals(results.size(), 1);
        } catch (Exception e) {
          e.printStackTrace();
          throw new CacheException(e) {};
        }
      }
    });

    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Tests client-server query on PdxInstance. The client receives projected value.
   */
  @Test
  public void testJSONWithHeterogenousObjectsDifferingFieldTypes() throws CacheException {

    final Host host = Host.getHost(0);
    VM vm0 = host.getVM(0);
    VM vm1 = host.getVM(1);

    // Start server1
    vm0.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        Region region = getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
        String obj1 =
            "{\"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": [ { \"Line1\": \"NYC\" }, { \"Line1\": \"NJ\" } ] }";
        String obj2 =
            "{ \"FirstName\": \"Vinay\", \"LastName\": \"Upadhya\", \"Address\": \"my address\" }";
        region.put("value1", JSONFormatter.fromJSON(obj2));
        region.put("value2", JSONFormatter.fromJSON(obj1));
      }
    });


    // Start server2
    vm1.invoke(new CacheSerializableRunnable("Create cache server") {
      @Override
      public void run2() throws CacheException {
        configAndStartBridgeServer();
        getCache().createRegionFactory(RegionShortcut.REPLICATE).create("testJson");
        // Execute query with different type of Results.
        QueryService qs = getCache().getQueryService();
        try {
          SelectResults result = (SelectResults) qs
              .newQuery("select r from /testJson r, r.Address a where a.Line1 = 'NYC'").execute();
          assertEquals(1, result.size());
          result = (SelectResults) qs
              .newQuery("select r from /testJson r, r.Address a where a = 'my address'").execute();
          assertEquals(1, result.size());
        } catch (Exception e) {
          e.printStackTrace();
          throw new CacheException(e) {};
        }
      }
    });

    this.closeClient(vm1);
    this.closeClient(vm0);
  }

  /**
   * Test Aggregate queries on Pdx instances
   */

  @Test
  public void testAggregateQueries() {
    VM vm0 = VM.getVM(0);
    VM vm1 = VM.getVM(1);
    VM vm2 = VM.getVM(2);

    final int port0 = vm0.invoke(() -> {
      configAndStartBridgeServer();
      Region region = getRootRegion().getSubregion(regionName);
      for (int i = 0; i < 10; i++) {
        region.put("key-" + i, new TestObject(i, "val-" + i));
      }
      return getCacheServerPort();
    });

    final int port1 = vm1.invoke(() -> {
      configAndStartBridgeServer();
      return getCacheServerPort();
    });

    final String host0 = NetworkUtils.getServerHostName();

    final String poolName = "testClientServerQueryPool";
    createPool(vm2, poolName, new String[] {host0}, new int[] {port1}, true);

    vm2.invoke(() -> {
      QueryService queryService = PoolManager.find(poolName).getQueryService();
      Query query = queryService.newQuery("select SUM(price) from " + regName + " where id > 0");
      SelectResults<Object> selectResults = (SelectResults) query.execute();
      assertEquals(1, selectResults.size());
      assertEquals(45, selectResults.asList().get(0));
    });

    vm2.invoke(() -> {
      QueryService queryService = PoolManager.find(poolName).getQueryService();
      Query query = queryService.newQuery("select SUM(price) from " + regName + " where id > 9");
      SelectResults<Long> selectResults = (SelectResults) query.execute();
      assertEquals(0, selectResults.size());
      assertEquals(0, selectResults.asList().size());
    });

    this.closeClient(vm0);
    this.closeClient(vm1);
    this.closeClient(vm2);
  }

}
