/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;

/**
 * We have 2 servers and One client which registers some keys with durable interest and some without
 * it. We maintain queues on only One server as redundancy level is one. Following 2 tests have the
 * two TestCase scenarios
 *
 * There are two Tests First Test does the follows : // Step 1: Starting the servers // Step 2:
 * Bring Up the Client // Step 3: Client registers Interests // Step 4: Update Values on the Server
 * for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of the DurableClient //
 * Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify Updates on the Client
 * // Step 10 : Stop all VMs
 *
 * For Test 2 the steps are as follows // Step 1: Starting the servers // Step 2: Bring Up the
 * Client // Step 3: Client registers Interests // Step 4: Update Values on the Server for Keys //
 * Step 5: Verify Updates on the Client // Step 6: Close Cache of the DurableClient // Step 7:
 * Update the Values // Step 8: Re-start the Client // Step 9: Send Client Ready Message // Step 10:
 * Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11: Unregister Some Keys
 * (Here K1, K3) // Step 12: Modify values on the server for all the Keys // Step 13: Check the
 * values for the ones not unregistered and the Unregistered Keys' Values should be null
 */
@Category({ClientSubscriptionTest.class})
public class DurableRegistrationDUnitTest extends JUnit4DistributedTestCase {

  private VM server1VM;

  private VM server2VM;

  private VM durableClientVM;

  private String regionName;

  private int PORT1;

  private int PORT2;

  private static final String K1 = "KEY_STONE1";

  private static final String K2 = "KEY_STONE2";

  private static final String K3 = "KEY_STONE3";

  private static final String K4 = "KEY_STONE4";

  public DurableRegistrationDUnitTest() {
    super();
  }

  @Override
  public final void postSetUp() throws Exception {
    Host host = Host.getHost(0);
    this.server1VM = host.getVM(0);
    this.server2VM = host.getVM(1);
    this.durableClientVM = host.getVM(2);
    regionName = DurableRegistrationDUnitTest.class.getName() + "_region";
    CacheServerTestUtil.disableShufflingOfEndpoints();
  }

  @Test
  public void testSimpleDurableClient() {

    // Step 1: Starting the servers
    PORT1 = ((Integer) this.server1VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();
    PORT2 = ((Integer) this.server2VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();

    // Step 2: Bring Up the Client
    // Start a durable client that is not kept alive on the server when it
    // stops normally
    final String durableClientId = getName() + "_client";

    final int durableClientTimeout = 600; // keep the client alive for 600
    // seconds
    this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
            0),
        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
        Boolean.TRUE));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // Step 3: Client registers Interests
    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
    // KEY_STONE4 as non-durableKeys

    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));

    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
    // KEY_STONE3, KEY_STONE4

    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "Value1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "Value2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "Value3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "Value4"));

    Wait.pause(1000);
    // Step 5: Verify Updates on the Client

    assertEquals("Value1", this.server2VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertEquals("Value1", this.server1VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));

    assertEquals("Value1",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertEquals("Value2",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
    assertEquals("Value3",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
    assertEquals("Value4",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));

    // Step 6: Close Cache of the DurableClient
    this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());
    // pause(5000);
    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong4"));

    // Step 8: Re-start the Client
    this.durableClientVM
        .invoke(() -> CacheServerTestUtil.createCacheClient(
            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
                true, 0),
            regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    Wait.pause(5000);

    assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));

    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));

    Wait.pause(5000);
    assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));

    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong_updated_1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong_updated_2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong_updated_3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong_updated_4"));

    Wait.pause(5000);

    // Step 9: Verify Updates on the Client
    assertEquals("PingPong_updated_1",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
    assertEquals("PingPong_updated_3",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
    assertEquals("PingPong_updated_4",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));

    // Step 10 : Stop all VMs
    // Stop the durable client
    this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 2
    this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 1
    this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());

  }

  @Test
  public void testSimpleDurableClientWithRegistration() {
    // Step 1: Starting the servers
    PORT1 = ((Integer) this.server1VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();
    PORT2 = ((Integer) this.server2VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();

    // Step 2: Bring Up the Client
    // Start a durable client that is not kept alive on the server when it
    // stops normally
    final String durableClientId = getName() + "_client";
    // keep the client alive for 600 seconds
    final int durableClientTimeout = 600;
    this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
            0),
        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // Step 3: Client registers Interests
    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
    // KEY_STONE4 as non-durableKeys

    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));

    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
    // KEY_STONE3, KEY_STONE4

    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "Value1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "Value2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "Value3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "Value4"));

    Wait.pause(1000);
    // Step 5: Verify Updates on the Client

    assertEquals("Value1", this.server2VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertEquals("Value1", this.server1VM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));

    assertEquals("Value1",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    assertEquals("Value2",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
    assertEquals("Value3",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
    assertEquals("Value4",
        this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));

    // Step 6: Close Cache of the DurableClient
    this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());
    // pause(5000);
    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong4"));

    // Step 8: Re-start the Client
    this.durableClientVM
        .invoke(() -> CacheServerTestUtil.createCacheClient(
            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
                true, 0),
            regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));

    // Step 9: Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // pause(1000);

    // Step 10: Register all Keys
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));

    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));

    // Step 11: Unregister Some Keys (Here K1, K3)
    this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.unregisterKey(K1));
    this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.unregisterKey(K3));

    Wait.pause(5000);

    // Step 12: Modify values on the server for all the Keys
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, "PingPong_updated_1"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, "PingPong_updated_2"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, "PingPong_updated_3"));
    this.server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, "PingPong_updated_4"));

    Wait.pause(5000);

    // Step 13: Check the values for the ones not unregistered and the
    // Unregistered Keys' Values should be null
    try {
      assertEquals("PingPong_updated_2",
          this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
    } catch (Exception e) {
      fail("Prob in KEY_STONE2: "
          + this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K2)));
    }

    try {
      assertEquals("PingPong_updated_4",
          this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));
    } catch (Exception e) {
      fail("Prob in KEY_STONE4: "
          + this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K4)));

    }

    try {
      assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));
    } catch (Exception e) {
      fail("Prob in KEY_STONE1: "
          + this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K1)));

    }

    try {
      assertNull(this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));
    } catch (Exception e) {
      fail("Prob in KEY_STONE3: "
          + this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.getValue(K3)));

    }

    // Stop the durable client
    this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 2
    this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 1
    this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());

  }

  @Test
  public void testDurableClientWithRegistrationHA() {

    // Step 1: Start server1
    PORT2 = getRandomAvailableTCPPort();

    PORT1 = ((Integer) this.server1VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();


    // Step 2: Bring Up the Client
    final String durableClientId = getName() + "_client";
    // keep the client alive for 600 seconds
    final int durableClientTimeout = 600;
    this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
            1),
        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // Step 3: Client registers Interests
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));

    // Step 4: Bring up the server2

    this.server2VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true), PORT2));

    Wait.pause(3000);

    // Check server2 got all the interests registered by the durable client.
    server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
      @Override
      public void run2() throws CacheException {
        LogWriterUtils.getLogWriter()
            .info("### Verifying interests registered by DurableClient. ###");
        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
        CacheClientProxy p = null;

        // Get proxy for the client.
        for (int i = 0; i < 60; i++) {
          Iterator ps = ccn.getClientProxies().iterator();
          if (!ps.hasNext()) {
            Wait.pause(1000);
            continue;
          } else {
            p = (CacheClientProxy) ps.next();
            break;
          }
        }

        if (p == null) {
          fail("Proxy initialization taking long time. Increase the wait time.");
        }

        Iterator rs = p.getInterestRegisteredRegions().iterator();
        String rName = (String) rs.next();
        assertNotNull("Null region Name found.", rs);
        LocalRegion r = (LocalRegion) GemFireCacheImpl.getInstance().getRegion(rName);
        assertNotNull("Null region found.", r);
        FilterProfile pf = r.getFilterProfile();
        Set intrests = Collections.EMPTY_SET;

        Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
        assertNotNull("durable Interests not found for the proxy", interestKeys);
        assertEquals("The number of durable keys registered during HARegion GII doesn't match.",
            interestKeys.size(), 2);
        interestKeys = pf.getKeysOfInterest(p.getProxyID());
        assertNotNull("non-durable Interests not found for the proxy", interestKeys);
        assertEquals("The number of non-durable keys registered during HARegion GII doesn't match.",
            interestKeys.size(), 2);
      }
    });


    // Stop the durable client
    this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 2
    this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 1
    this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());

  }

  @Test
  public void testDurableClientDisConnectWithRegistrationHA() {

    // Step 1: Start server1
    PORT2 = getRandomAvailableTCPPort();

    PORT1 = ((Integer) this.server1VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
            .intValue();


    // Step 2: Bring Up the Client
    final String durableClientId = getName() + "_client";
    // keep the client alive for 600 seconds
    final int durableClientTimeout = 600;
    this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2, true,
            1),
        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // Step 3: Client registers Interests
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, new Boolean(false)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, new Boolean(true)));
    this.durableClientVM
        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, new Boolean(true)));

    // Close Cache of the DurableClient
    this.durableClientVM.invoke(() -> DurableRegistrationDUnitTest.closeCache());

    Wait.pause(2000);

    // Re-start the Client
    this.durableClientVM
        .invoke(() -> CacheServerTestUtil.createCacheClient(
            getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, PORT2,
                true, 1),
            regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));

    // Send clientReady message
    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
      @Override
      public void run2() throws CacheException {
        CacheServerTestUtil.getCache().readyForEvents();
      }
    });

    // Step 4: Bring up the server2
    this.server2VM
        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true), PORT2));

    Wait.pause(3000);

    // Check server2 got all the interests registered by the durable client.
    server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
      @Override
      public void run2() throws CacheException {
        LogWriterUtils.getLogWriter()
            .info("### Verifying interests registered by DurableClient. ###");
        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
        CacheClientProxy p = null;

        // Get proxy for the client.
        for (int i = 0; i < 60; i++) {
          Iterator ps = ccn.getClientProxies().iterator();
          if (!ps.hasNext()) {
            Wait.pause(1000);
            continue;
          } else {
            p = (CacheClientProxy) ps.next();
            break;
          }
        }

        if (p == null) {
          fail("Proxy initialization taking long time. Increase the wait time.");
        }

        Iterator rs = p.getInterestRegisteredRegions().iterator();
        String rName = (String) rs.next();
        assertNotNull("Null region Name found.", rs);
        LocalRegion r = (LocalRegion) GemFireCacheImpl.getInstance().getRegion(rName);
        assertNotNull("Null region found.", r);
        FilterProfile pf = r.getFilterProfile();
        Set intrests = Collections.EMPTY_SET;

        Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
        assertNotNull("durable Interests not found for the proxy", interestKeys);
        assertEquals("The number of durable keys registered during HARegion GII doesn't match.",
            interestKeys.size(), 2);
        interestKeys = pf.getKeysOfInterest(p.getProxyID());
        assertNull("non-durable Interests found for the proxy", interestKeys);
      }
    });


    // Stop the durable client
    this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 2
    this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());

    // Stop server 1
    this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());

  }

  private static void unregisterAllKeys() {
    // Get the region
    Region region = CacheServerTestUtil.getCache()
        .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
    // Region region =
    // CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
    assertNotNull(region);
    region.unregisterInterest(K1);
    region.unregisterInterest(K2);
    region.unregisterInterest(K3);
    region.unregisterInterest(K4);

  }

  private static void registerKeys() throws Exception {
    try {
      // Get the region
      Region region = CacheServerTestUtil.getCache()
          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
      // Region region =
      // CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
      assertNotNull(region);

      region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
      region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
      region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
      region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);

      assertNotNull(region.getInterestList());

    } catch (Exception ex) {
      Assert.fail("failed while registering interest in registerKey function", ex);
    }
  }

  private static String getValue(String key) {
    Region r = CacheServerTestUtil.getCache()
        .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
    // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
    assertNotNull(r);
    // String value = (String)r.get(key);
    // String value = (String)r.getEntry(key).getValue();
    Region.Entry re = r.getEntry(key);

    if (re == null) {
      return null;
    } else {
      return (String) re.getValue();
    }
  }

  private static void registerKey(String key, boolean isDurable) throws Exception {
    try {
      // Get the region
      Region region = CacheServerTestUtil.getCache()
          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
      // Region region =
      // CacheServerTestUtil.getCache().getRegion(regionName);
      assertNotNull(region);
      region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
    } catch (Exception ex) {
      Assert.fail("failed while registering interest in registerKey function", ex);
    }
  }

  private static void unregisterKey(String key) throws Exception {
    try {
      // Get the region
      Region region = CacheServerTestUtil.getCache()
          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
      // Region region =
      // CacheServerTestUtil.getCache().getRegion(regionName);
      assertNotNull(region);
      region.unregisterInterest(key);
    } catch (Exception ex) {
      Assert.fail("failed while registering interest in registerKey function", ex);
    }
  }

  private static void putValue(String key, String value) {
    try {
      Region r = CacheServerTestUtil.getCache()
          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
      // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
      assertNotNull(r);
      if (r.getEntry(key) != null) {
        r.put(key, value);
      } else {
        r.create(key, value);
      }
      assertEquals(value, r.getEntry(key).getValue());
    } catch (Exception e) {

      fail("Put in Server has some fight");

    }
  }

  private Pool getClientPool(String host, int server1Port, int server2Port,
      boolean establishCallbackConnection, int redundancyLevel) {
    PoolFactory pf = PoolManager.createFactory();
    pf.addServer(host, server1Port).addServer(host, server2Port)
        .setSubscriptionEnabled(establishCallbackConnection)
        .setSubscriptionRedundancy(redundancyLevel);
    return ((PoolFactoryImpl) pf).getPoolAttributes();
  }

  private Properties getClientDistributedSystemProperties(String durableClientId) {
    return getClientDistributedSystemProperties(durableClientId,
        DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
  }

  private static void checkNumberOfClientProxies(final int expected) {
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return expected == getNumberOfClientProxies();
      }

      @Override
      public String description() {
        return null;
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
  }

  protected static int getNumberOfClientProxies() {
    return getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
  }

  private Properties getClientDistributedSystemProperties(String durableClientId,
      int durableClientTimeout) {
    Properties properties = new Properties();
    properties.setProperty(MCAST_PORT, "0");
    properties.setProperty(LOCATORS, "");
    properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
    properties.setProperty(DURABLE_CLIENT_TIMEOUT, String.valueOf(durableClientTimeout));
    return properties;
  }

  private static CacheClientProxy getClientProxy() {
    // Get the CacheClientNotifier
    CacheClientNotifier notifier = getBridgeServer().getAcceptor().getCacheClientNotifier();

    // Get the CacheClientProxy or not (if proxy set is empty)
    CacheClientProxy proxy = null;
    Iterator i = notifier.getClientProxies().iterator();
    if (i.hasNext()) {
      proxy = (CacheClientProxy) i.next();
    }
    return proxy;
  }

  private static CacheServerImpl getBridgeServer() {
    CacheServerImpl bridgeServer =
        (CacheServerImpl) CacheServerTestUtil.getCache().getCacheServers().iterator().next();
    assertNotNull(bridgeServer);
    return bridgeServer;
  }

  public static void closeCache() {
    Cache cache = CacheServerTestUtil.getCache();
    if (cache != null && !cache.isClosed()) {
      cache.close(true);
      cache.getDistributedSystem().disconnect();
    }
  }

  public String getRegionName() {
    return regionName;
  }

  public void setRegionName(String regionName) {
    this.regionName = regionName;
  }

  @Override
  public final void preTearDown() throws Exception {
    CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
  }
}
