/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.GemFireConfigException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.Op;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.QueueConnectionImpl;
import org.apache.geode.cache.client.internal.RegisterInterestTracker;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.DUnitLauncher;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.version.VersionManager;

/**
 * Tests client server corner cases between Region and Pool
 */
@Category({ClientServerTest.class})
public class ClientServerMiscDUnitTestBase extends JUnit4CacheTestCase {

  protected static PoolImpl pool = null;

  protected static Connection conn = null;

  static Cache static_cache;

  static int PORT1;

  private static final String k1 = "k1";

  private static final String k2 = "k2";

  static final String server_k1 = "server-k1";

  static final String server_k2 = "server-k2";

  static final String REGION_NAME1 = "ClientServerMiscDUnitTest_region1";

  static final String REGION_NAME2 = "ClientServerMiscDUnitTest_region2";

  private static final String PR_REGION_NAME = "ClientServerMiscDUnitTest_PRregion";

  private static Host host;

  protected static VM server1;

  protected static VM server2;

  private static RegionAttributes attrs;

  // variables for concurrent map API test
  Properties props = new Properties();
  private final int putRange_1Start = 1;
  private final int putRange_1End = 5;
  private final int putRange_2Start = 6;
  private final int putRange_2End = 10;


  String testVersion; // version for client caches for backward-compatibility
                      // testing

  public ClientServerMiscDUnitTestBase() {
    testVersion = VersionManager.CURRENT_VERSION;
  }

  @Override
  public final void postSetUp() {
    host = Host.getHost(0);
    server1 = host.getVM(VersionManager.CURRENT_VERSION, 2);
    server2 = host.getVM(VersionManager.CURRENT_VERSION, 3);
  }

  int initServerCache(boolean notifyBySub) {
    return initServerCache(notifyBySub, false);
  }

  int initServerCache2() {
    return initServerCache2(false);
  }

  private int initServerCache(boolean notifyBySub, boolean isHA) {
    return initServerCache(notifyBySub, server1, isHA);
  }

  private int initServerCache2(boolean isHA) {
    return initServerCache(true, server2, isHA);
  }

  int initServerCache(boolean notifyBySub, VM vm, boolean isHA) {
    return vm.invoke(() -> createServerCache(notifyBySub, getMaxThreads(), isHA));
  }

  @Test
  public void testConcurrentOperationsWithDRandPR() {
    int port1 = initServerCache(true); // vm0
    int port2 = initServerCache2(); // vm1
    String serverName = NetworkUtils.getServerHostName();
    host.getVM(testVersion, 0).invoke(() -> createClientCacheV(serverName, port1));
    host.getVM(testVersion, 1).invoke(() -> createClientCacheV(serverName, port2));
    LogService.getLogger()
        .info("Testing concurrent map operations from a client with a distributed region");
    concurrentMapTest(host.getVM(testVersion, 0), "/" + REGION_NAME1);
    // TODO add verification in vm1
    LogService.getLogger()
        .info("Testing concurrent map operations from a client with a partitioned region");
    concurrentMapTest(host.getVM(testVersion, 0), "/" + PR_REGION_NAME);
    // TODO add verification in vm1
  }

  /**
   * When a client's subscription thread connects to a server it should receive the server's
   * pingInterval setting. This is used by the client to set a read-timeout in order to avoid
   * hanging should the server's machine crash.
   */
  @Test
  public void testClientReceivesPingIntervalSetting() {
    VM clientVM = Host.getHost(0).getVM(testVersion, 0);

    final int port = initServerCache(true);
    final String host = NetworkUtils.getServerHostName();

    clientVM.invoke("create client cache and verify", () -> {
      createClientCacheAndVerifyPingIntervalIsSet(host, port);
    });
  }

  void createClientCacheAndVerifyPingIntervalIsSet(String host, int port) throws Exception {
    PoolImpl pool = null;
    try {
      Properties props = new Properties();
      props.setProperty(MCAST_PORT, "0");
      props.setProperty(LOCATORS, "");

      createCache(props);

      pool = (PoolImpl) PoolManager.createFactory().addServer(host, port)
          .setSubscriptionEnabled(true).setReadTimeout(1000)
          .setSocketBufferSize(32768).setMinConnections(1).setSubscriptionRedundancy(-1)
          .setPingInterval(2000).create("test pool");

      Region<Object, Object> region = cache.createRegionFactory(RegionShortcut.LOCAL)
          .setPoolName("test pool").create(REGION_NAME1);
      region.registerInterest(".*");

      /** get the subscription connection and verify that it has the correct timeout setting */
      QueueConnectionImpl primaryConnection = (QueueConnectionImpl) pool.getPrimaryConnection();
      int pingInterval = ((CacheClientUpdater) primaryConnection.getUpdater())
          .getServerQueueStatus().getPingInterval();
      assertNotEquals(0, pingInterval);
      assertEquals(CacheClientNotifier.getClientPingInterval(), pingInterval);
    } finally {
      cache.close();
    }

  }

  @Test
  public void testConcurrentOperationsWithDRandPRandEmptyClient() {
    int port1 = initServerCache(true); // vm0
    int port2 = initServerCache2(); // vm1
    String serverName = NetworkUtils.getServerHostName();
    host.getVM(testVersion, 0).invoke(() -> createEmptyClientCache(serverName, port1));
    host.getVM(testVersion, 1).invoke(() -> createClientCacheV(serverName, port2));
    LogService.getLogger()
        .info("Testing concurrent map operations from a client with a distributed region");
    concurrentMapTest(host.getVM(testVersion, 0), "/" + REGION_NAME1);
    // TODO add verification in vm1
    LogService.getLogger()
        .info("Testing concurrent map operations from a client with a partitioned region");
    concurrentMapTest(host.getVM(testVersion, 0), "/" + PR_REGION_NAME);
    // TODO add verification in vm1
  }

  /**
   * Do putIfAbsent(), replace(Object, Object), replace(Object, Object, Object), remove(Object,
   * Object) operations
   */
  private void concurrentMapTest(final VM clientVM, final String rName) {

    // String exceptionStr = "";
    clientVM.invoke(new CacheSerializableRunnable("doConcurrentMapOperations") {
      @Override
      public void run2() throws CacheException {
        Cache cache = getCache();
        final Region pr = cache.getRegion(rName);
        assertNotNull(rName + " not created", pr);
        boolean isEmpty = pr.getAttributes().getDataPolicy() == DataPolicy.EMPTY;

        // test successful putIfAbsent
        for (int i = putRange_1Start; i <= putRange_1End; i++) {
          Object putResult = pr.putIfAbsent(Integer.toString(i), Integer.toString(i));
          assertNull("Expected null, but got " + putResult + " for key " + i, putResult);
        }
        int size;
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test unsuccessful putIfAbsent
        for (int i = putRange_1Start; i <= putRange_1End; i++) {
          Object putResult = pr.putIfAbsent(Integer.toString(i), Integer.toString(i + 1));
          assertEquals("for i=" + i, Integer.toString(i), putResult);
          assertEquals("for i=" + i, Integer.toString(i), pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test successful replace(key, oldValue, newValue)
        for (int i = putRange_1Start; i <= putRange_1End; i++) {
          boolean replaceSucceeded =
              pr.replace(Integer.toString(i), Integer.toString(i), "replaced" + i);
          assertTrue("for i=" + i, replaceSucceeded);
          assertEquals("for i=" + i, "replaced" + i, pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test unsuccessful replace(key, oldValue, newValue)
        for (int i = putRange_1Start; i <= putRange_2End; i++) {
          boolean replaceSucceeded = pr.replace(Integer.toString(i), Integer.toString(i), // wrong
                                                                                          // expected
                                                                                          // old
                                                                                          // value
              "not" + i);
          assertFalse("for i=" + i, replaceSucceeded);
          assertEquals("for i=" + i, i <= putRange_1End ? "replaced" + i : null,
              pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test successful replace(key, value)
        for (int i = putRange_1Start; i <= putRange_1End; i++) {
          Object replaceResult = pr.replace(Integer.toString(i), "twice replaced" + i);
          assertEquals("for i=" + i, "replaced" + i, replaceResult);
          assertEquals("for i=" + i, "twice replaced" + i, pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test unsuccessful replace(key, value)
        for (int i = putRange_2Start; i <= putRange_2End; i++) {
          Object replaceResult = pr.replace(Integer.toString(i), "thrice replaced" + i);
          assertNull("for i=" + i, replaceResult);
          assertNull("for i=" + i, pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test unsuccessful remove(key, value)
        for (int i = putRange_1Start; i <= putRange_2End; i++) {
          boolean removeResult = pr.remove(Integer.toString(i), Integer.toString(-i));
          assertFalse("for i=" + i, removeResult);
          assertEquals("for i=" + i, i <= putRange_1End ? "twice replaced" + i : null,
              pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", putRange_1End, size);
          assertFalse("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        // test successful remove(key, value)
        for (int i = putRange_1Start; i <= putRange_1End; i++) {
          boolean removeResult = pr.remove(Integer.toString(i), "twice replaced" + i);
          assertTrue("for i=" + i, removeResult);
          assertNull("for i=" + i, pr.get(Integer.toString(i)));
        }
        if (!isEmpty) {
          size = pr.size();
          assertEquals("Size doesn't return expected value", 0, size);
          pr.localClear();
          assertTrue("isEmpty doesnt return proper state of the PartitionedRegion", pr.isEmpty());
        }

        if (!isEmpty) {
          // bug #42169 - entry not updated on server when locally destroyed on client
          String key42169 = "key42169";
          pr.put(key42169, "initialValue42169");
          pr.localDestroy(key42169);
          boolean success = pr.replace(key42169, "initialValue42169", "newValue42169");
          assertTrue("expected replace to succeed", success);
          pr.destroy(key42169);
          pr.put(key42169, "secondRound");
          pr.localDestroy(key42169);
          Object result = pr.putIfAbsent(key42169, null);
          assertEquals("expected putIfAbsent to fail", result, "secondRound");
          pr.destroy(key42169);
        }

        if (isEmpty) {
          String key41265 = "key41265";
          boolean success = pr.remove(key41265, null);
          assertFalse("expected remove to fail because key does not exist", success);
        }

        // test null values

        // putIfAbsent with null value creates invalid entry
        Object oldValue = pr.putIfAbsent("keyForNull", null);
        assertNull(oldValue);
        if (!isEmpty) {
          assertTrue(pr.containsKey("keyForNull"));
          assertTrue(!pr.containsValueForKey("keyForNull"));
        }

        // replace allows null value for oldValue, meaning invalidated entry
        assertTrue(pr.replace("keyForNull", null, "no longer invalid"));

        // replace does not allow null value for new value
        assertThatThrownBy(() -> pr.replace("keyForNull", "no longer invalid", null))
            .isExactlyInstanceOf(NullPointerException.class);

        // other variant of replace does not allow null value for new value
        assertThatThrownBy(() -> pr.replace("keyForNull", null))
            .isExactlyInstanceOf(NullPointerException.class);

        // replace with null oldvalue matches invalidated entry
        pr.putIfAbsent("otherKeyForNull", null);
        CachePerfStats stats = ((GemFireCacheImpl) pr.getCache()).getCachePerfStats();

        Number puts = getNumPuts(stats);
        boolean success = pr.replace("otherKeyForNull", null, "no longer invalid");
        assertTrue(success);

        Number newputs = getNumPuts(stats);
        assertEquals("stats not updated properly or replace malfunctioned", newputs.longValue(),
            puts.longValue() + 1);

      }

      public Number getNumPuts(CachePerfStats stats) {
        Method getPutsMethod = null;
        try {
          getPutsMethod = stats.getClass().getMethod("getPuts");
        } catch (NoSuchMethodException e) {
          fail(e.getMessage());
        }

        Number puts = null;
        try {
          puts = (Number) getPutsMethod.invoke(stats);
        } catch (IllegalAccessException e) {
          fail(e.getMessage());
        } catch (InvocationTargetException e) {
          fail(e.getMessage());
        }

        return puts;
      }
    });
  }

  /**
   * Test two regions: notify by subscription is true. For region1 the interest list is empty , for
   * region 2 the intetest list is all keys. If an update/create is made on region1 , the client
   * should not receive any. If the create/update is on region2 , the client should receive the
   * update.
   */
  @Test
  public void testForTwoRegionHavingDifferentInterestList() {
    // start server first
    PORT1 = initServerCache(true);
    int serverPort = PORT1;
    VM client1 = Host.getHost(0).getVM(testVersion, 1);
    String hostname = NetworkUtils.getServerHostName();
    client1.invoke("create client1 cache", () -> {
      createClientCache(hostname, serverPort);
      populateCache();
      registerInterest();
    });

    server1.invoke("putting entries in server1", () -> put());

    client1.invoke(() -> verifyUpdates());
  }

  /**
   * Test two regions: notify by subscription is true. Both the regions have registered interest in
   * all the keys. Now close region1 on the client. The region1 should get removed from the interest
   * list on CCP at server. Any update on region1 on server should not get pushed to the client.
   * Ensure that the message related is not added to the client's queue at all ( which is diferent
   * from not receiving a callbak on the client). If an update on region2 is made on the server ,
   * then client should receive the calback
   */
  @Test
  public void testForTwoRegionHavingALLKEYSInterest() throws Exception {
    // start server first
    PORT1 = initServerCache(true);
    createClientCache(NetworkUtils.getServerHostName(), PORT1);
    populateCache();
    registerInterestInBothTheRegions();
    closeRegion1();
    Wait.pause(6000);
    server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyInterestListOnServer());
    server1.invoke(() -> ClientServerMiscDUnitTestBase.put());
    // pause(5000);
    verifyUpdatesOnRegion2();
  }

  /**
   * Test two regions: notify by subscription is true. Both the regions have registered interest in
   * all the keys. Close both the regions. When the last region is closed , it should close the
   * ConnectionProxy on the client , close all the server connection threads on the server & remove
   * the CacheClientProxy from the CacheClient notifier
   */
  @Test
  public void testRegionClose() throws Exception {
    // start server first
    PORT1 = initServerCache(true);
    pool = (PoolImpl) createClientCache(NetworkUtils.getServerHostName(), PORT1);
    populateCache();
    registerInterestInBothTheRegions();
    closeBothRegions();

    assertFalse(pool.isDestroyed());
    pool.destroy();
    assertTrue(pool.isDestroyed());
    server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyNoCacheClientProxyOnServer());

  }

  /**
   * Test two regions: notify by subscription is true. Both the regions have registered interest in
   * all the keys. Destroy region1 on the client. It should reach the server , kill the region on
   * the server , propagate it to the interested clients , but it should keep CacheClient Proxy
   * alive. Destroy Region2 . It should reach server , close conenction proxy , destroy the region2
   * on the server , remove the cache client proxy from the cache client notifier & propagate it to
   * the clients. Then create third region and verify that no CacheClientProxy is created on server
   */
  @Test
  public void testCCPDestroyOnLastDestroyRegion() throws Exception {
    PORT1 = initServerCache(true);
    PoolImpl pool =
        (PoolImpl) createClientCache(NetworkUtils.getServerHostName(), PORT1);
    destroyRegion1();
    // pause(5000);
    server1.invoke(
        () -> ClientServerMiscDUnitTestBase.verifyCacheClientProxyOnServer(REGION_NAME1));
    Connection conn = pool.acquireConnection();
    assertNotNull(conn);
    assertEquals(1, pool.getConnectedServerCount());
    assertFalse(pool.isDestroyed());
    destroyRegion2();
    assertFalse(pool.isDestroyed());
    destroyPRRegion();
    assertFalse(pool.isDestroyed());
    pool.destroy();
    assertTrue(pool.isDestroyed());
    // pause(5000);
    server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyNoCacheClientProxyOnServer());

    assertThatThrownBy(() -> getCache().createRegion(REGION_NAME2, attrs))
        .isExactlyInstanceOf(IllegalStateException.class);
  }

  /**
   * Test two regions:If notify by subscription is false , both the regions should receive
   * invalidates for the updates on server in their respective regions
   *
   */
  @Test
  public void testInvalidatesPropagateOnTwoRegions() throws Exception {
    // start server first
    PORT1 = initServerCache(false);
    createClientCache(NetworkUtils.getServerHostName(), PORT1);
    registerInterestForInvalidatesInBothTheRegions();
    populateCache();
    server1.invoke(() -> ClientServerMiscDUnitTestBase.put());
    // pause(5000);
    verifyInvalidatesOnBothRegions();

  }

  /**
   * Test for bug 43407, where LRU in the client caused an entry to be evicted with DESTROY(), then
   * the client invalidated the entry and did a get(). After the get() the entry was not seen to be
   * in the client's cache. This turned out to be expected behavior, but we now have this test to
   * guarantee that the product behaves as expected.
   */
  @Test
  public void testGetInClientCreatesEntry() throws Exception {
    // start server first
    PORT1 = initServerCache(false);
    createClientCache(NetworkUtils.getServerHostName(), PORT1);
    registerInterestForInvalidatesInBothTheRegions();
    Region region = static_cache.getRegion(REGION_NAME1);
    populateCache();
    region.put("invalidationKey", "invalidationValue");

    region.localDestroy("invalidationKey");
    assertThat(region.containsKey("invalidationKey")).isFalse();

    region.invalidate("invalidationKey");
    assertThat(region.containsKey("invalidationKey")).isTrue();

    Object value = region.get("invalidationKey");
    assertThat(value).isNull();
    assertThat(region.containsKeyOnServer("invalidationKey")).isTrue();
  }

  /**
   * GEODE-478 - large payloads are rejected by client->server
   */
  @Test
  public void testLargeMessageIsRejected() throws Exception {
    PORT1 = initServerCache(false);
    createClientCache(NetworkUtils.getServerHostName(), PORT1);
    Region region = static_cache.getRegion(REGION_NAME1);
    Op operation = new Op() {
      @Override
      public Object attempt(Connection cnx) throws Exception {
        throw new MessageTooLargeException("message is too big");
      }
    };
    try {
      ((LocalRegion) region).getServerProxy().getPool().execute(operation);
    } catch (GemFireIOException e) {
      assertTrue(e.getCause() instanceof MessageTooLargeException);
      return;
    }
    fail("expected an exception to be thrown");
  }

  /**
   * Create cache, create pool, notify-by-subscription=false, create a region and on client and on
   * server. Do not attach pool to region , populate some entries on region both on client and
   * server. Update the entries on server the client. The client should not have entry invalidate.
   */
  @Test
  public void testInvalidatesPropagateOnRegionHavingNoPool() {
    // start server first
    PORT1 = initServerCache(false);
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");
    new ClientServerMiscDUnitTestBase().createCache(props);
    String host = NetworkUtils.getServerHostName();
    PoolImpl p =
        (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true)
            .setReadTimeout(1000).setSocketBufferSize(32768)
            .setMinConnections(3).setSubscriptionRedundancy(-1).setPingInterval(2000)
            // .setRetryAttempts(5)
            // .setRetryInterval(2000)
            .create("testInvalidatesPropagateOnRegionHavingNoPool");

    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    // factory.setPoolName(p.getName());

    attrs = factory.create();
    final Region region1 = getCache().createRegion(REGION_NAME1, attrs);
    final Region region2 = getCache().createRegion(REGION_NAME2, attrs);
    assertNotNull(region1);
    assertNotNull(region2);
    pool = p;
    conn = pool.acquireConnection();
    assertNotNull(conn);

    populateCache();
    server1.invoke(() -> ClientServerMiscDUnitTestBase.put());

    await().until(() -> {
      Object val = region1.getEntry(k1).getValue();
      return k1.equals(val);
    });

    await().until(() -> {
      Object val = region1.getEntry(k2).getValue();
      return k2.equals(val);
    });

    await().until(() -> {
      Object val = region2.getEntry(k1).getValue();
      return k1.equals(val);
    });

    await().until(() -> {
      Object val = region2.getEntry(k2).getValue();
      return k2.equals(val);
    });

    // assertIndexDetailsEquals(region2.getEntry(k2).getValue(), k2);
  }

  /**
   * Create proxy before cache creation, create cache, create two regions, attach same bridge writer
   * to both of the regions Region interests AL_KEYS on both the regions,
   * notify-by-subscription=true . The CCP should have both the regions in interest list.
   *
   */

  @Test
  public void testProxyCreationBeforeCacheCreation() {
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");
    DistributedSystem ds = getSystem(props);
    assertNotNull(ds);
    ds.disconnect();
    ds = getSystem(props);
    PORT1 = initServerCache(true);
    String host = NetworkUtils.getServerHostName();
    Pool p = PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true)
        .setSubscriptionRedundancy(-1)
        // .setRetryAttempts(5)
        .create("testProxyCreationBeforeCacheCreationPool");

    Cache cache = getCache();
    assertNotNull(cache);

    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    factory.setPoolName(p.getName());

    RegionAttributes myAttrs = factory.create();
    Region region1 = cache.createRegion(REGION_NAME1, myAttrs);
    Region region2 = cache.createRegion(REGION_NAME2, myAttrs);
    assertNotNull(region1);
    assertNotNull(region2);
    // region1.registerInterest(CacheClientProxy.ALL_KEYS);
    region2.registerInterest("ALL_KEYS");
    Wait.pause(6000);
    server1.invoke(() -> ClientServerMiscDUnitTestBase.verifyInterestListOnServer());

  }

  /**
   *
   * bug 35380: Cycling a DistributedSystem with an initialized pool causes interest registration
   * NPE
   *
   * Test Scenario:
   *
   * Create a DistributedSystem (DS1). Create a pool, initialize (creates a proxy with DS1 memberid)
   * Disconnect DS1. Create a DistributedSystem (DS2). Create a Region with pool, it attempts to
   * register interest using DS2 memberid, gets NPE.
   *
   */
  @Test
  public void testSystemCanBeCycledWithAnInitializedPool() {
    // work around GEODE-477
    IgnoredException.addIgnoredException("Connection reset");
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");
    DistributedSystem ds = getSystem(props);
    assertNotNull(ds);

    PORT1 = initServerCache(true);
    String host = NetworkUtils.getServerHostName();
    Pool p = PoolManager.createFactory().addServer(host, PORT1).setSubscriptionEnabled(true)
        .setSubscriptionRedundancy(-1)
        // .setRetryAttempts(5)
        .create("testBug35380Pool");

    Cache cache = getCache();

    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    factory.setPoolName(p.getName());

    RegionAttributes myAttrs = factory.create();
    Region region1 = cache.createRegion(REGION_NAME1, myAttrs);
    Region region2 = cache.createRegion(REGION_NAME2, myAttrs);
    assertNotNull(region1);
    assertNotNull(region2);

    region2.registerInterest("ALL_KEYS");

    ds.disconnect();
    Properties prop = new Properties();
    prop.setProperty(MCAST_PORT, "0");
    prop.setProperty(LOCATORS, "");
    ds = getSystem(prop);

    final Cache cacheForLamda = getCache();
    assertNotNull(cacheForLamda);

    AttributesFactory factory1 = new AttributesFactory();
    factory1.setScope(Scope.DISTRIBUTED_ACK);
    // reuse writer from prev DS
    factory1.setPoolName(p.getName());

    final RegionAttributes attrs1 = factory1.create();
    assertThatThrownBy(() -> cacheForLamda.createRegion(REGION_NAME1, attrs1))
        .isInstanceOfAny(IllegalStateException.class, DistributedSystemDisconnectedException.class);
  }

  @Test(expected = GemFireConfigException.class)
  public void clientIsPreventedFromConnectingToLocatorAsServer() {
    IgnoredException.addIgnoredException("Improperly configured client detected");
    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
    clientCacheFactory.addPoolServer("localhost", DistributedTestUtils.getDUnitLocatorPort());
    clientCacheFactory.setPoolSubscriptionEnabled(true);
    getClientCache(clientCacheFactory);
    Region region = ((ClientCache) cache).createClientRegionFactory(ClientRegionShortcut.PROXY)
        .create(REGION_NAME1);
    region.registerInterest(k1);
  }


  private void createCache(Properties props) {
    createCacheV(props);
  }

  private Cache createCacheV(Properties props) {
    DistributedSystem ds = getSystem(props);
    assertNotNull(ds);
    ds.disconnect();
    ds = getSystem(props);
    Cache cache = getCache();
    assertNotNull(cache);
    return cache;
  }

  public static void createClientCacheV(String h, int port) {
    _createClientCache(h, false, -1, port);
  }

  public static void createEmptyClientCache(String h, int... ports) {
    _createClientCache(h, false, -1, ports);
  }

  public static Pool createClientCache(String h, int... ports) {
    return _createClientCache(h, false, -1, ports);
  }

  public static Pool createClientCache(String h, int subscriptionAckInterval, boolean empty,
      int... ports) {
    return _createClientCache(h, empty, subscriptionAckInterval, ports);
  }

  private static PoolFactory addServers(PoolFactory factory, String h, int... ports) {
    for (int port : ports) {
      factory.addServer(h, port);
    }
    return factory;
  }

  public static Pool _createClientCache(String h, boolean empty, int subscriptionAckInterval,
      int... ports) {
    Properties props = new Properties();
    props.setProperty(MCAST_PORT, "0");
    props.setProperty(LOCATORS, "");
    props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel);
    Cache cache = new ClientServerMiscDUnitTestBase().createCacheV(props);
    ClientServerMiscDUnitTestBase.static_cache = cache;
    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints",
        "true");
    PoolImpl p;
    try {
      PoolFactory poolFactory = PoolManager.createFactory();
      addServers(poolFactory, h, ports).setSubscriptionEnabled(true)
          .setReadTimeout(5000).setSocketBufferSize(32768).setMinConnections(3)
          .setSubscriptionRedundancy(1).setPingInterval(2000);
      // .setRetryAttempts(5)
      // .setRetryInterval(2000)
      if (subscriptionAckInterval > 0) {
        poolFactory.setSubscriptionAckInterval(subscriptionAckInterval);
      }
      p = (PoolImpl) poolFactory.create("ClientServerMiscDUnitTestPool");

      AttributesFactory factory = new AttributesFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);
      if (empty) {
        factory.setDataPolicy(DataPolicy.EMPTY);
      }
      factory.setPoolName(p.getName());

      attrs = factory.create();
    } finally {
      System.getProperties()
          .remove(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints");
    }

    Region region1 = cache.createRegion(REGION_NAME1, attrs);
    Region region2 = cache.createRegion(REGION_NAME2, attrs);
    Region prRegion = cache.createRegion(PR_REGION_NAME, attrs);
    assertNotNull(region1);
    assertNotNull(region2);
    assertNotNull(prRegion);
    pool = p;

    await().until(() -> {
      try {
        conn = pool.acquireConnection();
        return conn != null;
      } catch (NoAvailableServersException e) {
        return false;
      }
    });

    return p;
  }

  static class MemberIDVerifier extends CacheListenerAdapter {
    boolean memberIDNotReceived = true;
    boolean eventReceived = false;

    @Override
    public void afterCreate(EntryEvent event) {
      eventReceived(event);
    }

    @Override
    public void afterUpdate(EntryEvent event) {
      eventReceived(event);
    }

    private void eventReceived(EntryEvent event) {
      eventReceived = true;
      DistributedMember memberID = event.getDistributedMember();
      memberIDNotReceived = (memberID == null);
      // System.out.println("received event " + event);
    }

    public void reset() {
      memberIDNotReceived = true;
      eventReceived = false;
    }
  }

  public static void dumpPoolIdentifiers() throws Exception {
    // duplicate events were received, so let's look at the thread identifiers we have
    PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool");

    Map seqMap = pool.getThreadIdToSequenceIdMap();
    for (Object o : seqMap.keySet()) {
      ThreadIdentifier tid = (ThreadIdentifier) o;
      byte[] memberBytes = tid.getMembershipID();
      dumpMemberId(tid, memberBytes);
    }
  }

  public static void dumpMemberId(Object holder, byte[] memberBytes) throws Exception {
    byte[] newBytes = new byte[memberBytes.length + 17];
    System.arraycopy(memberBytes, 0, newBytes, 0, memberBytes.length);
    ByteArrayInputStream bais = new ByteArrayInputStream(newBytes);
    DataInputStream dataIn = new DataInputStream(bais);
    InternalDistributedMember memberId = InternalDistributedMember.readEssentialData(dataIn);
    String sb = "<" + Thread.currentThread().getName() + "> " + holder
        + " is " + memberId + " byte count = " + memberBytes.length
        + " bytes = " + Arrays.toString(memberBytes);
    System.out.println(sb);
  }


  public static Integer createServerCache(Boolean notifyBySubscription, Integer maxThreads,
      boolean isHA) throws Exception {
    Cache cache = new ClientServerMiscDUnitTestBase().createCacheV(new Properties());
    unsetSlowDispatcherFlag();
    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    factory.setEnableConflation(true);
    factory.setDataPolicy(DataPolicy.REPLICATE);
    factory.setConcurrencyChecksEnabled(true);
    RegionAttributes myAttrs = factory.create();
    Region r1 = cache.createRegion(REGION_NAME1, myAttrs);
    Region r2 = cache.createRegion(REGION_NAME2, myAttrs);
    factory = new AttributesFactory();
    factory.setDataPolicy(DataPolicy.PARTITION);
    if (isHA) {
      PartitionAttributesFactory paf = new PartitionAttributesFactory().setRedundantCopies(1);
      factory.setPartitionAttributes(paf.create());
    }
    RegionAttributes prAttrs = factory.create();

    Region pr = cache.createRegion(PR_REGION_NAME, prAttrs);
    assertNotNull(r1);
    assertNotNull(r2);
    assertNotNull(pr);

    CacheServer server = cache.addCacheServer();
    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
    r1.getCache().getDistributedSystem().getLogWriter().info("Starting server on port " + port);
    server.setPort(port);
    server.setMaxThreads(maxThreads);
    server.setNotifyBySubscription(notifyBySubscription);
    server.start();
    r1.getCache().getDistributedSystem().getLogWriter()
        .info("Started server on port " + server.getPort());
    return server.getPort();

  }

  protected int getMaxThreads() {
    return 0;
  }

  public static void registerInterest() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
    assertNotNull(r);
    r.registerInterest("ALL_KEYS");
    r.getAttributesMutator().addCacheListener(new MemberIDVerifier());
  }

  private static void registerInterestForInvalidatesInBothTheRegions() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      assertNotNull(r1);
      Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      assertNotNull(r2);
      r1.registerInterestForAllKeys(InterestResultPolicy.KEYS, false, false);
      r2.registerInterestForAllKeys(InterestResultPolicy.KEYS, false, false);
    } catch (CacheWriterException e) {
      e.printStackTrace();
      fail("Test failed due to CacheWriterException during registerInterestnBothRegions" + e);
    }
  }

  private static void registerInterestInBothTheRegions() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      assertNotNull(r1);
      Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      assertNotNull(r2);
      r1.registerInterestForAllKeys();
      r2.registerInterestForAllKeys();
    } catch (CacheWriterException e) {
      e.printStackTrace();
      fail("Test failed due to CacheWriterException during registerInterestnBothRegions" + e);
    }
  }

  private static void closeRegion1() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region<String, String> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      assertNotNull(r1);
      r1.close();
    } catch (Exception e) {
      e.printStackTrace();
      fail("Test failed due to Exception during closeRegion1" + e);
    }
  }

  private static void closeBothRegions() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      Region pr = cache.getRegion(Region.SEPARATOR + PR_REGION_NAME);
      assertNotNull(r1);
      assertNotNull(r2);
      assertNotNull(pr);
      r1.close();
      r2.close();
      pr.close();
    } catch (Exception e) {
      e.printStackTrace();
      fail("Test failed due to Exception during closeBothRegions" + e);
    }
  }

  private static void destroyRegion1() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      assertNotNull(r1);
      r1.destroyRegion();
    } catch (Exception e) {
      e.printStackTrace();
      fail("Test failed due to Exception during closeBothRegions" + e);
    }
  }

  private static void destroyRegion2() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      assertNotNull(r2);
      r2.destroyRegion();
    } catch (Exception e) {
      e.printStackTrace();
      fail("Test failed due to Exception during closeBothRegions" + e);
    }
  }

  private static void destroyPRRegion() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      Region r2 = cache.getRegion(Region.SEPARATOR + PR_REGION_NAME);
      assertNotNull(r2);
      r2.destroyRegion();
    } catch (Exception e) {
      // e.printStackTrace();
      fail("Test failed due to Exception during closeBothRegions" + e);
    }
  }

  private static void verifyInterestListOnServer() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
      CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
      assertNotNull(bs);
      assertNotNull(bs.getAcceptor());
      assertNotNull(bs.getAcceptor().getCacheClientNotifier());
      for (CacheClientProxy ccp : bs.getAcceptor().getCacheClientNotifier().getClientProxies()) {
        // CCP should not contain region1
        Set<String> akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions;
        assertNotNull(akr);
        assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1));
        // CCP should contain region2
        assertTrue(akr.contains(Region.SEPARATOR + REGION_NAME2));
        assertEquals(1, akr.size());
      }
    } catch (Exception ex) {
      ex.printStackTrace();
      fail("while setting verifyInterestListOnServer  " + ex);
    }
  }

  private static void verifyNoCacheClientProxyOnServer() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
      CacheServerImpl cacheServer = (CacheServerImpl) cache.getCacheServers().iterator().next();
      assertNotNull(cacheServer);
      assertNotNull(cacheServer.getAcceptor());
      final CacheClientNotifier ccn = cacheServer.getAcceptor().getCacheClientNotifier();

      assertNotNull(ccn);
      await()
          .until(() -> ccn.getClientProxies().size() == 0);
    } catch (Exception ex) {
      System.out.println("The size of the client proxies != 0");
      OSProcess.printStacks(0);
      throw ex;
    }
  }

  private static void verifyCacheClientProxyOnServer(String regionName) {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      assertNull(cache.getRegion(Region.SEPARATOR + regionName));
      verifyCacheClientProxyOnServer();

      // assertIndexDetailsEquals(1,
      // bs.getAcceptor().getCacheClientNotifier().getClientProxies().size());
    } catch (Exception ex) {
      ex.printStackTrace();
      fail("while setting verifyNoCacheClientProxyOnServer  " + ex);
    }
  }

  private static void verifyCacheClientProxyOnServer() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
    CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
    assertNotNull(bs);
    assertNotNull(bs.getAcceptor());
    final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();

    assertNotNull(ccn);

    await()
        .until(() -> ccn.getClientProxies().size() == 1);
  }

  public static void populateCache() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
    Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
    assertNotNull(r1);
    assertNotNull(r2);

    if (!r1.containsKey(k1))
      r1.create(k1, k1);
    if (!r1.containsKey(k2))
      r1.create(k2, k2);
    if (!r2.containsKey(k1))
      r2.create(k1, k1);
    if (!r2.containsKey(k2))
      r2.create(k2, k2);

    assertEquals(r1.getEntry(k1).getValue(), k1);
    assertEquals(r1.getEntry(k2).getValue(), k2);
    assertEquals(r2.getEntry(k1).getValue(), k1);
    assertEquals(r2.getEntry(k2).getValue(), k2);
  }

  public static void put() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
    Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
    assertNotNull(r1);
    assertNotNull(r2);

    r1.put(k1, server_k1);
    r1.put(k2, server_k2);

    r2.put(k1, server_k1);
    r2.put(k2, server_k2);

    assertEquals(r1.getEntry(k1).getValue(), server_k1);
    assertEquals(r1.getEntry(k2).getValue(), server_k2);
    assertEquals(r2.getEntry(k1).getValue(), server_k1);
    assertEquals(r2.getEntry(k2).getValue(), server_k2);
  }

  static void putForClient() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
    if (r2 == null) {
      r2 = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION_NAME2);
    }

    r2.put(k1, "client2_k1");
    r2.put(k2, "client2_k2");
  }

  private static void verifyUpdates() {
    Cache cache = new ClientServerMiscDUnitTestBase().getCache();
    final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
    final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
    assertNotNull(r1);
    assertNotNull(r2);

    // no interest registered in region1 - it should hold client values, which are
    // the same as the keys
    await().until(() -> {
      Object val = r1.getEntry(k1).getValue();
      return k1.equals(val);
    });

    await().until(() -> {
      Object val = r1.getEntry(k2).getValue();
      return k2.equals(val);
    });

    // interest was registered in region2 - it should contain server values
    await().until(() -> {
      Object val = r2.getEntry(k1).getValue();
      return server_k1.equals(val);
    });

    await().until(() -> {
      Object val = r2.getEntry(k2).getValue();
      return server_k2.equals(val);
    });

    // events should have contained a memberID
    MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener();
    assertTrue("client should have received a listener event", verifier.eventReceived);
    assertFalse("client received an update but the event had no member id",
        verifier.memberIDNotReceived);
    verifier.reset();

  }

  private static void verifyInvalidatesOnBothRegions() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME1);
      final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      assertNotNull(r1);
      assertNotNull(r2);

      await()
          .until(() -> r1.getEntry(k1).getValue() == null);

      await()
          .until(() -> r1.getEntry(k2).getValue() == null);

      await()
          .until(() -> r2.getEntry(k1).getValue() == null);

      await()
          .until(() -> r2.getEntry(k2).getValue() == null);

    } catch (Exception ex) {
      fail("failed while verifyInvalidatesOnBothRegions()" + ex);
    }
  }

  private static void verifyUpdatesOnRegion2() {
    try {
      Cache cache = new ClientServerMiscDUnitTestBase().getCache();
      final Region r2 = cache.getRegion(Region.SEPARATOR + REGION_NAME2);
      assertNotNull(r2);

      await()
          .until(() -> server_k1.equals(r2.getEntry(k1).getValue()));

      await()
          .until(() -> server_k2.equals(r2.getEntry(k2).getValue()));

      // assertIndexDetailsEquals(server_k2, r2.getEntry(k2).getValue());
    } catch (Exception ex) {
      fail("failed while verifyUpdatesOnRegion2()" + ex);
    }
  }

  /**
   * set the boolean for starting the dispatcher thread a bit later to FALSE. This is just a
   * precaution in case any test set it to true and did not unset it on completion.
   *
   */
  private static void unsetSlowDispatcherFlag() {
    CacheClientProxy.isSlowStartForTesting = false;
  }

  @Test
  public void testOnSeverMethodsWithProxyClient() throws Exception {
    testOnServerMethods(false, false);
  }

  @Test
  public void testOnSeverMethodsWithCachingProxyClient() throws Exception {
    testOnServerMethods(true, false);
  }

  @Test
  public void testOnSeverMethodsWithProxyClientHA() throws Exception {
    testOnServerMethods(false, true);
  }

  @Test
  public void testOnSeverMethodsWithCachingProxyClientHA() throws Exception {
    testOnServerMethods(true, true);
  }

  private void testOnServerMethods(boolean isCachingProxy, boolean isHA) throws Exception {
    int port1 = initServerCache(true, isHA); // vm0
    int port2 = initServerCache2(isHA); // vm1
    String serverName = NetworkUtils.getServerHostName();
    if (isCachingProxy) {
      createClientCache(serverName, port1, port2);
    } else {
      createEmptyClientCache(serverName, port1, port2);
    }
    if (isHA) {
      // add another server for HA scenario
      initServerCache(true, host.getVM(VersionManager.CURRENT_VERSION, 1), true);
    }
    String rName = "/" + REGION_NAME1;
    String prName = "/" + PR_REGION_NAME;

    verifyIsEmptyOnServer(rName, true);
    verifyIsEmptyOnServer(prName, true);

    int size = 10;
    putIntoRegion(rName, size, isCachingProxy);
    verifySizeOnServer(rName, size);

    putIntoRegion(prName, size, isCachingProxy);
    if (isHA) {
      server1.invoke(() -> closeMyCache());
    }
    verifySizeOnServer(prName, size);

    verifyIsEmptyOnServer(rName, false);
    verifyIsEmptyOnServer(prName, false);

    destroyEntries(rName, size);
    destroyEntries(prName, size);

    verifyIsEmptyOnServer(rName, true);
    verifyIsEmptyOnServer(prName, true);
    verifySizeOnServer(rName, 0);
    verifySizeOnServer(prName, 0);
  }

  private void putIntoRegion(String regionName, int size, boolean isCachingProxy) {
    Cache cache = getCache();
    final Region region = cache.getRegion(regionName);
    for (int i = 0; i < size; i++) {
      region.put(i, i);
    }

    if (isCachingProxy) {
      for (int i = 0; i < size; i++) {
        region.localDestroy(i, i);
      }
    }
  }

  private void destroyEntries(String regionName, int size) {
    Cache cache = getCache();
    final Region region = cache.getRegion(regionName);

    for (int i = 0; i < size; i++) {
      region.destroy(i);
    }
  }

  private void verifySizeOnServer(String regionName, int expectedSize) {
    Cache cache = getCache();
    final Region region = cache.getRegion(regionName);
    int actualSize = region.sizeOnServer();
    assertEquals(
        "sizeOnServer returns unexpected " + actualSize + " instead of expected " + expectedSize,
        expectedSize, actualSize);
  }

  private void verifyIsEmptyOnServer(String regionName, boolean expected) {
    Cache cache = getCache();
    final Region region = cache.getRegion(regionName);
    boolean isEmptyOnServer = region.isEmptyOnServer();
    assertEquals("isEmptyOnServer returns unexpected " + isEmptyOnServer + " instead of expected "
        + expected, expected, isEmptyOnServer);
  }

  private void closeMyCache() {
    Cache cache = getCache();
    cache.close();
  }

}
