/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.versions;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.persistence.DiskStoreID;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

public class RegionVersionVectorTest {

  @Rule
  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

  @Rule
  public ExpectedException expectedException = ExpectedException.none();

  /**
   * server1 will simulate doing a sync with another server for operations performed
   * by server2. server3 is another server in the cluster that we don't care about
   */
  @Test
  public void testSynchronizationVectorContainsAllVersionsForOwnerAndNonTarget() {
    final String local = getIPLiteral();
    InternalDistributedMember server1 = new InternalDistributedMember(local, 101);
    InternalDistributedMember server2 = new InternalDistributedMember(local, 102);
    InternalDistributedMember server3 = new InternalDistributedMember(local, 103);

    RegionVersionVector rv1 = new VMRegionVersionVector(server1);
    rv1.updateLocalVersion(10);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server3, 3);
    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
    assertTrue(singletonRVV.isForSynchronization());
    assertEquals(singletonRVV.getOwnerId(), server1);
    assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));

    assertTrue(singletonRVV.contains(server1, 1));
    assertTrue(singletonRVV.contains(server1, 11));

    assertTrue(singletonRVV.contains(server3, 1));
    assertTrue(singletonRVV.contains(server3, 11));

    assertTrue(singletonRVV.contains(server2, 1));
    assertTrue(singletonRVV.contains(server2, 5));
    assertTrue(singletonRVV.contains(server2, 8));

    assertFalse(singletonRVV.contains(server2, 2));
    assertFalse(singletonRVV.contains(server2, 3));
    assertFalse(singletonRVV.contains(server2, 4));
    assertFalse(singletonRVV.contains(server2, 6));
    assertFalse(singletonRVV.contains(server2, 7));
    assertFalse(singletonRVV.contains(server2, 9));
  }

  @Test
  public void testSynchronizationVectorContainsAllVersionsForSameOwnerAsTargetAndNonTarget() {
    final String local = getIPLiteral();
    InternalDistributedMember server1 = new InternalDistributedMember(local, 101);
    InternalDistributedMember server2 = new InternalDistributedMember(local, 102);
    InternalDistributedMember server3 = new InternalDistributedMember(local, 103);

    RegionVersionVector rv1 = new VMRegionVersionVector(server1);
    rv1.updateLocalVersion(10);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server3, 3);
    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
    assertTrue(singletonRVV.isForSynchronization());
    assertEquals(singletonRVV.getOwnerId(), server1);
    assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));

    assertTrue(singletonRVV.contains(server1, 1));
    assertTrue(singletonRVV.contains(server1, 11));

    assertTrue(singletonRVV.contains(server3, 1));
    assertTrue(singletonRVV.contains(server3, 11));

    assertTrue(singletonRVV.contains(server2, 1));
    assertTrue(singletonRVV.contains(server2, 5));
    assertTrue(singletonRVV.contains(server2, 8));
    assertTrue(singletonRVV.contains(server2, 2));
    assertTrue(singletonRVV.contains(server2, 3));
    assertTrue(singletonRVV.contains(server2, 4));
    assertTrue(singletonRVV.contains(server2, 6));
    assertTrue(singletonRVV.contains(server2, 7));
    assertTrue(singletonRVV.contains(server2, 9));
  }

  /**
   * server1 will simulate doing a sync with another server for operations performed
   * by server2. server3 is another server in the cluster that we don't care about
   * servers have version source as dist store id
   */
  @Test
  public void testSynchronizationVectorWithDiskStoreIdContainsAllVersionsForNonTarget() {
    DiskStoreID server1 = new DiskStoreID(0, 0);
    DiskStoreID server2 = new DiskStoreID(0, 1);
    DiskStoreID server3 = new DiskStoreID(1, 0);

    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
    rv1.updateLocalVersion(10);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server3, 3);
    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
    assertTrue(singletonRVV.isForSynchronization());
    assertEquals(singletonRVV.getOwnerId(), server1);
    assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));

    assertTrue(singletonRVV.contains(server1, 1));
    assertTrue(singletonRVV.contains(server1, 11));

    assertTrue(singletonRVV.contains(server3, 1));
    assertTrue(singletonRVV.contains(server3, 11));
  }

  @Test
  public void testSynchronizationVectorWithDiskStoreIdContainsVersionsForTarget() {
    DiskStoreID server1 = new DiskStoreID(0, 0);
    DiskStoreID server2 = new DiskStoreID(0, 1);
    DiskStoreID server3 = new DiskStoreID(1, 0);

    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
    rv1.updateLocalVersion(10);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server3, 3);
    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
    assertTrue(singletonRVV.isForSynchronization());
    assertEquals(singletonRVV.getOwnerId(), server1);
    assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));

    assertTrue(singletonRVV.contains(server2, 1));
    assertTrue(singletonRVV.contains(server2, 5));
    assertTrue(singletonRVV.contains(server2, 8));

    assertFalse(singletonRVV.contains(server2, 2));
    assertFalse(singletonRVV.contains(server2, 3));
    assertFalse(singletonRVV.contains(server2, 4));
    assertFalse(singletonRVV.contains(server2, 6));
    assertFalse(singletonRVV.contains(server2, 7));
    assertFalse(singletonRVV.contains(server2, 9));
  }

  @Test
  public void testSynchronizationVectorWithDiskStoreIdContainsVersionsForTargetAsOriginator() {
    DiskStoreID server1 = new DiskStoreID(0, 0);
    DiskStoreID server2 = new DiskStoreID(0, 1);
    DiskStoreID server3 = new DiskStoreID(1, 0);

    RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
    RegionVersionHolder localExceptions = rv1.getLocalExceptions();
    localExceptions.addException(2, 5);
    localExceptions.addException(7, 9);
    rv1.updateLocalVersion(10);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server3, 3);
    RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
    assertTrue(singletonRVV.isForSynchronization());
    assertEquals(singletonRVV.getOwnerId(), server1);
    assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
    assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));

    assertTrue(singletonRVV.contains(server1, 1));
    assertTrue(singletonRVV.contains(server1, 2));
    assertTrue(singletonRVV.contains(server1, 5));
    assertTrue(singletonRVV.contains(server1, 6));
    assertTrue(singletonRVV.contains(server1, 7));
    assertTrue(singletonRVV.contains(server1, 9));
    assertTrue(singletonRVV.contains(server1, 10));

    assertFalse(singletonRVV.contains(server1, 3));
    assertFalse(singletonRVV.contains(server1, 4));
    assertFalse(singletonRVV.contains(server1, 8));
  }

  @Test
  public void testExceptionsWithContains() {
    DiskStoreID ownerId = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);
    DiskStoreID id2 = new DiskStoreID(1, 0);

    DiskRegionVersionVector rvv = new DiskRegionVersionVector(ownerId);

    doExceptionsWithContains(ownerId, rvv);
    doExceptionsWithContains(id1, rvv);
  }

  @SuppressWarnings({"unchecked", "rawtypes"})
  @Test
  public void testRegionVersionVectors() throws Exception {
    // this is just a quick set of unit tests for basic RVV functionality

    final String local = getIPLiteral();
    InternalDistributedMember server1 = new InternalDistributedMember(local, 101);
    InternalDistributedMember server2 = new InternalDistributedMember(local, 102);
    InternalDistributedMember server3 = new InternalDistributedMember(local, 103);
    InternalDistributedMember server4 = new InternalDistributedMember(local, 104);

    RegionVersionVector rv1 = null;

    // (a) Test that an exception is mended when the versions that are missing are
    // added
    rv1 = new VMRegionVersionVector(server1);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    System.out.println("for test (a) formed this RVV: " + rv1.fullToString());
    // there should now be two exceptions: 1-5 and 5-8
    assertEquals(8, rv1.getVersionForMember(server2));
    assertEquals(2, rv1.getExceptionCount(server2));
    rv1.recordVersion(server2, 3);
    System.out.println("for test (a) RVV is now: " + rv1.fullToString());
    assertEquals(8, rv1.getVersionForMember(server2));
    assertEquals(2, rv1.getExceptionCount(server2));
    rv1.recordVersion(server2, 4);
    rv1.recordVersion(server2, 2);
    System.out.println("for test (a) RVV is now: " + rv1.fullToString());
    assertEquals(1, rv1.getExceptionCount(server2));
    rv1.recordVersion(server2, 6);
    rv1.recordVersion(server2, 7);
    System.out.println("for test (a) RVV is now: " + rv1.fullToString());
    assertEquals(0, rv1.getExceptionCount(server2));

    // (b) Test the contains() operation
    rv1 = new VMRegionVersionVector(server1);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 5);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server2, 10);
    System.out.println("for test (b) formed this RVV: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 1));
    assertTrue(rv1.contains(server2, 5));
    assertTrue(rv1.contains(server2, 8));
    assertTrue(rv1.contains(server2, 10));
    assertFalse(rv1.contains(server2, 2));
    assertFalse(rv1.contains(server2, 3));
    assertFalse(rv1.contains(server2, 4));
    assertFalse(rv1.contains(server2, 9));
    assertFalse(rv1.contains(server2, 11));
    rv1.recordVersion(server2, 3);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 3));
    rv1.recordVersion(server2, 2);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 2));
    assertTrue(rv1.contains(server2, 3));
    rv1.recordVersion(server2, 4);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 1));
    assertTrue(rv1.contains(server2, 2));
    assertTrue(rv1.contains(server2, 3));
    assertTrue(rv1.contains(server2, 4));
    assertTrue(rv1.contains(server2, 5));
    rv1.recordVersion(server2, 11);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 11));
    assertTrue(rv1.contains(server2, 10));
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    rv1.recordVersion(server2, 6);
    assertTrue(rv1.contains(server2, 2));
    assertTrue(rv1.contains(server2, 5));
    assertTrue(rv1.contains(server2, 6));
    assertFalse(rv1.contains(server2, 7));
    assertTrue(rv1.contains(server2, 8));
    rv1.recordVersion(server2, 7);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 7));
    rv1.recordVersion(server2, 9);
    System.out.println("for test (b) RVV is now: " + rv1.fullToString());
    assertTrue(rv1.contains(server2, 9));
    assertTrue(rv1.getExceptionCount(server2) == 0);
    assertTrue(rv1.contains(server2, 8));

    // Test RVV comparisons for GII Delta
    rv1 = new VMRegionVersionVector(server1);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server2, 4);
    rv1.recordVersion(server2, 8);
    rv1.recordVersion(server2, 9);
    rv1.recordVersion(server2, 10);
    rv1.recordVersion(server2, 11);
    rv1.recordVersion(server2, 12);

    rv1.recordVersion(server3, 2);
    rv1.recordVersion(server3, 3);
    rv1.recordVersion(server3, 4);
    rv1.recordVersion(server3, 6);
    rv1.recordVersion(server3, 7);

    RegionVersionVector rv2 = rv1.getCloneForTransmission();
    System.out.println("rv1 is " + rv1.fullToString());
    System.out.println("rv2 is " + rv2.fullToString());
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    assertFalse(rv2.isNewerThanOrCanFillExceptionsFor(rv1));

    rv1.recordVersion(server2, 6);
    assertTrue(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv2.recordVersion(server2, 6);
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));

    // fill an exception gap
    rv1.recordVersion(server2, 5);
    assertTrue(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv2.recordVersion(server2, 5);
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv1.recordVersion(server2, 7);
    assertTrue(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv2.recordVersion(server2, 7);
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));

    // add a more recent revision
    rv1.recordVersion(server3, 8);
    assertTrue(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv2.recordVersion(server3, 8);
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));

    // fill another exception gap
    rv1.recordVersion(server3, 5);
    assertTrue(rv1.isNewerThanOrCanFillExceptionsFor(rv2));
    rv2.recordVersion(server3, 5);
    assertFalse(rv1.isNewerThanOrCanFillExceptionsFor(rv2));

    // test that old members are removed from the vector
    InternalDistributedMember server5 = new InternalDistributedMember(local, 105);
    rv1 = new VMRegionVersionVector(server1);
    rv1.recordVersion(server2, 1);
    rv1.recordVersion(server3, 1);
    rv1.recordVersion(server4, 1);
    rv1.recordVersion(server5, 1);
    rv1.memberDeparted(null, server2, false);
    rv1.memberDeparted(null, server4, true);
    assertTrue(rv1.containsMember(server2));
    assertTrue(rv1.containsMember(server3));
    assertTrue(rv1.containsMember(server4));
    Set retain = new HashSet();
    retain.add(server2); // still have data from server2
    retain.add(server3); // still have data from server3
    // no data found from server4 in region
    retain.add(server5); // still have data from server5
    rv1.removeOldMembers(retain);
    assertFalse(rv1.containsMember(server4));

    rv1.memberDeparted(null, server3, false); // {server2, server3(departed), server5}

    // Now test that departed members are transferred with GII. We simulate
    // a new server, server6, doing a GII from server1

    InternalDistributedMember server6 = new InternalDistributedMember(local, 106);
    RegionVersionVector giiReceiverRVV = new VMRegionVersionVector(server6);
    // the gii request will cause server1 to clone its RVV and send it to server6
    rv2 = rv1.getCloneForTransmission();
    // serialize/deserialize to mimic sending the rvv in a message
    ByteArrayOutputStream baos = new ByteArrayOutputStream(1000);
    DataOutputStream out = new DataOutputStream(baos);
    DataSerializer.writeObject(rv2, out);
    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    DataInputStream in = new DataInputStream(bais);
    RegionVersionVector transmittedVector = (RegionVersionVector) DataSerializer.readObject(in);
    // record the provider's rvv in the receiver of the image
    giiReceiverRVV.recordVersions(transmittedVector);
    // removedMembers in the receiver should hold {server4, server3}. Simulate
    // another member departure to kick out server4
    assertTrue(giiReceiverRVV.containsMember(server2));
    assertTrue(giiReceiverRVV.containsMember(server5));
    assertTrue(giiReceiverRVV.containsMember(server3));
    assertTrue(giiReceiverRVV.isDepartedMember(server3));

    // unit test for bit-set boundary. First boundary is 3/4 of bitset width,
    // which is the amount dumped to the exceptions list when the bitset becomes full
    rv1 = new VMRegionVersionVector(server1);
    long bitSetRollPoint = RegionVersionHolder.BIT_SET_WIDTH + 1;
    long boundary = RegionVersionHolder.BIT_SET_WIDTH * 3 / 4;
    for (long i = 1; i < boundary; i++) {
      rv1.recordVersion(server2, i);
      assertTrue(rv1.contains(server2, i));
    }
    assertFalse(rv1.contains(server2, boundary + 1));

    rv1.recordVersion(server2, bitSetRollPoint);
    rv1.recordVersion(server2, bitSetRollPoint + 1); // bitSet should be rolled at this point
    RegionVersionHolder h = (RegionVersionHolder) rv1.getMemberToVersion().get(server2);
    long versionBoundary = h.getBitSetVersionForTesting();
    assertEquals("expected holder bitset version to roll to this value", boundary - 1,
        versionBoundary);
    assertFalse(rv1.contains(server2, bitSetRollPoint - 1));
    assertTrue(rv1.contains(server2, bitSetRollPoint));
    assertTrue(rv1.contains(server2, bitSetRollPoint + 1));
    assertFalse(rv1.contains(server2, bitSetRollPoint + 2));

    assertTrue(rv1.contains(server2, boundary - 1));
    assertFalse(rv1.contains(server2, boundary));
    assertFalse(rv1.contains(server2, boundary + 1));

    // now test the merge
    System.out.println("testing merge for " + rv1.fullToString());
    assertEquals(1, rv1.getExceptionCount(server2)); // one exception from boundary-1 to
    // bitSetRollPoint
    assertFalse(rv1.contains(server2, bitSetRollPoint - 1));
    assertTrue(rv1.contains(server2, bitSetRollPoint));
    assertTrue(rv1.contains(server2, bitSetRollPoint + 1));
    assertFalse(rv1.contains(server2, bitSetRollPoint + 2));
  }

  @Test
  public void testRVVSerialization() throws Exception {
    DiskStoreID ownerId = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);
    DiskStoreID id2 = new DiskStoreID(1, 0);

    DiskRegionVersionVector rvv = new DiskRegionVersionVector(ownerId);
    rvv.recordVersion(id1, 5);
    rvv.recordVersion(id1, 6);
    rvv.recordVersion(id1, 7);
    rvv.recordVersion(id1, 9);
    rvv.recordVersion(id1, 20);
    rvv.recordVersion(id1, 11);
    rvv.recordVersion(id1, 12);
    rvv.recordGCVersion(id2, 5);
    rvv.recordGCVersion(id1, 3);

    assertTrue(rvv.sameAs(rvv.getCloneForTransmission()));

    HeapDataOutputStream out = new HeapDataOutputStream(Version.CURRENT);
    DataSerializer.writeObject(rvv.getCloneForTransmission(), out);
    byte[] bytes = out.toByteArray();

    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
    DiskRegionVersionVector rvv2 = DataSerializer.readObject(dis);

    assertTrue(rvv.sameAs(rvv2));
  }

  /**
   * Test that we can copy the member to version map correctly.
   */
  @Test
  public void testCopyMemberToVersion() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);
    DiskStoreID id2 = new DiskStoreID(1, 0);

    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);
    rvv0.getNextVersion();
    rvv0.getNextVersion();
    rvv0.getNextVersion();

    rvv0.recordVersion(id1, 1);
    rvv0.recordVersion(id1, 3);

    DiskRegionVersionVector rvv1 = new DiskRegionVersionVector(id1);
    rvv1.recordVersions(rvv0);

    assertEquals(3, rvv1.getCurrentVersion());
    assertFalse(rvv1.contains(id1, 2));
    assertTrue(rvv1.contains(id1, 1));
    assertTrue(rvv1.contains(id1, 3));

    assertTrue(rvv1.contains(id0, 3));

    assertTrue(rvv0.sameAs(rvv1));

    rvv1.recordVersion(id1, 2);
    assertTrue(rvv1.isNewerThanOrCanFillExceptionsFor(rvv0));
    assertFalse(rvv0.isNewerThanOrCanFillExceptionsFor(rvv1));
    assertTrue(rvv1.dominates(rvv0));
    assertFalse(rvv0.dominates(rvv1));
  }

  @Test
  public void testSpecialException() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);
    DiskStoreID id2 = new DiskStoreID(1, 0);

    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);
    rvv0.getNextVersion();
    rvv0.getNextVersion();
    rvv0.getNextVersion();

    rvv0.recordVersion(id1, 1);
    rvv0.recordVersion(id1, 2);

    DiskRegionVersionVector rvv1 = new DiskRegionVersionVector(id1);
    rvv1.recordVersions(rvv0);

    rvv1.recordVersion(id1, 3);
    RegionVersionHolder holder_at_rvv1 = rvv1.getLocalExceptions();
    RegionVersionHolder holder_at_rvv0 = rvv0.getMemberToVersion().get(id1);
    holder_at_rvv1.addException(2, 4);
    assertFalse(rvv1.isNewerThanOrCanFillExceptionsFor(rvv0));
    assertFalse(rvv0.isNewerThanOrCanFillExceptionsFor(rvv1));
    assertTrue(rvv1.dominates(rvv0));
    assertTrue(rvv0.dominates(rvv1));
  }

  @Test
  public void test48066_1() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);
    for (int i = 1; i <= 3; i++) {
      rvv0.recordVersion(id0, i);
    }
    System.out.println("rvv0=" + rvv0.fullToString());

    DiskRegionVersionVector rvv1 = (DiskRegionVersionVector) rvv0.getCloneForTransmission();
    System.out.println("after clone, rvv1=" + rvv1.fullToString());

    DiskRegionVersionVector rvv2 = new DiskRegionVersionVector(id0);
    for (int i = 1; i <= 10; i++) {
      rvv2.recordVersion(id0, i);
    }
    rvv2.recordVersions(rvv1);
    System.out.println("after init, rvv2=" + rvv2.fullToString());

    rvv2.recordVersion(id0, 4);
    System.out.println("after record 4, rvv2=" + rvv2.fullToString());
    assertEquals(4, rvv2.getCurrentVersion());

    rvv2.recordVersion(id0, 7);
    System.out.println("after record 7, rvv2=" + rvv2.fullToString());
    assertEquals(7, rvv2.getCurrentVersion());
  }

  @Test
  public void test48066_2() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);
    for (int i = 1; i <= 10; i++) {
      rvv0.recordVersion(id0, i);
    }

    DiskRegionVersionVector rvv1 = new DiskRegionVersionVector(id0);
    rvv0.recordVersions(rvv1);
    System.out.println("rvv0=" + rvv0.fullToString());

    rvv0.recordVersion(id0, 4);
    System.out.println("after record 4, rvv0=" + rvv0.fullToString());
    assertEquals(4, rvv0.getCurrentVersion());

    rvv0.recordVersion(id0, 7);
    System.out.println("after record 7, rvv0=" + rvv0.fullToString());
    assertEquals(7, rvv0.getCurrentVersion());
    assertFalse(rvv0.contains(id0, 5));

    DiskRegionVersionVector rvv2 = (DiskRegionVersionVector) rvv0.getCloneForTransmission();
    System.out.println("after clone, rvv2=" + rvv2.fullToString());
    assertEquals(11, rvv0.getNextVersion());
    assertFalse(rvv2.contains(id0, 5));
    assertEquals(11, rvv2.getNextVersion());
  }

  /**
   * Test for bug 47023. Make sure recordGCVersion works correctly and doesn't generate exceptions
   * for the local member.
   */
  @Test
  public void testRecordGCVersion() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);

    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);

    // generate 3 local versions
    rvv0.getNextVersion();
    rvv0.getNextVersion();
    rvv0.getNextVersion();

    // record some version from a remote member
    rvv0.recordVersion(id1, 1);
    rvv0.recordVersion(id1, 3);
    rvv0.recordVersion(id1, 5);

    // Assert that the exceptions are present
    {
      Map<DiskStoreID, RegionVersionHolder<DiskStoreID>> memberToVersion =
          rvv0.getMemberToVersion();
      RegionVersionHolder<DiskStoreID> holder1 = memberToVersion.get(id1);

      // Make sure the exceptions are present
      assertFalse(holder1.contains(2));
      assertFalse(holder1.contains(4));
    }

    // Record some GC versions
    rvv0.recordGCVersion(id0, 2);
    rvv0.recordGCVersion(id1, 3);

    {
      Map<DiskStoreID, RegionVersionHolder<DiskStoreID>> memberToVersion =
          rvv0.getMemberToVersion();
      RegionVersionHolder<DiskStoreID> holder0 = memberToVersion.get(id0);
      // Make sure we didn't generate a bogus exception for
      // the local member by calling record GC version - bug 47023
      assertTrue(holder0.getExceptionForTest().isEmpty());
    }

    // Clean up old exceptions
    rvv0.pruneOldExceptions();

    // Make assertions about what exceptions are still present
    Map<DiskStoreID, RegionVersionHolder<DiskStoreID>> memberToVersion = rvv0.getMemberToVersion();
    RegionVersionHolder<DiskStoreID> holder0 = memberToVersion.get(id0);
    RegionVersionHolder<DiskStoreID> holder1 = memberToVersion.get(id1);
    assertTrue(holder0.getExceptionForTest().isEmpty());

    // exceptions less than the GC version should have been removed
    assertTrue(holder1.contains(2));

    // exceptions greater than the GC version should still be there.
    assertFalse(holder1.contains(4));
  }


  @Test
  public void recordLargeGCVersionShouldRecordSuccessfully() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);

    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);

    rvv0.recordGCVersion(id1, ((long) Integer.MAX_VALUE) - 10L);
  }

  @Test
  public void incrementingTheLocalVersionShouldNotCreateExceptions() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);

    DiskRegionVersionVector rvv0 = new DiskRegionVersionVector(id0);

    // Increment the version a couple of times
    rvv0.getNextVersion();
    rvv0.getNextVersion();

    RegionVersionVector<DiskStoreID> clone = rvv0.getCloneForTransmission();

    assertThat(clone.getHolderForMember(id0).getExceptionForTest()).isEmpty();
    assertThat(rvv0.getHolderForMember(id0).getExceptionForTest()).isEmpty();
  }

  @Test
  public void testRemoveOldVersions() {
    DiskStoreID id0 = new DiskStoreID(0, 0);
    DiskStoreID id1 = new DiskStoreID(0, 1);
    DiskStoreID id2 = new DiskStoreID(0, 2);

    DiskRegionVersionVector rvv = new DiskRegionVersionVector(id0);

    // generate 3 local versions
    rvv.getNextVersion();
    rvv.getNextVersion();
    rvv.getNextVersion();

    // record some version from a remote member
    rvv.recordVersion(id1, 1);
    rvv.recordVersion(id1, 3);
    rvv.recordVersion(id1, 5);
    // record a GC version for that member that is older than its version
    rvv.recordGCVersion(id1, 3);

    rvv.recordGCVersion(id2, 4950);
    rvv.recordVersion(id2, 5000);
    rvv.recordVersion(id2, 5001);
    rvv.recordVersion(id2, 5005);

    rvv.removeOldVersions();

    assertEquals("expected gc version to be set to current version for " + rvv.fullToString(),
        rvv.getCurrentVersion(), rvv.getGCVersion(null));
    assertEquals("expected gc version to be set to current version for " + rvv.fullToString(),
        rvv.getVersionForMember(id1), rvv.getGCVersion(id1));
    assertEquals("expected gc version to be set to current version for " + rvv.fullToString(),
        rvv.getVersionForMember(id2), rvv.getGCVersion(id2));

    assertEquals("expected exceptions to be erased for " + rvv.fullToString(),
        rvv.getExceptionCount(id1), 0);
    assertEquals("expected exceptions to be erased for " + rvv.fullToString(),
        rvv.getExceptionCount(id2), 0);
  }

  @Test
  public void testRegionVersionInTags() {
    VMVersionTag tag = new VMVersionTag();
    long version = 0x8080000000L;
    tag.setRegionVersion(version);
    assertEquals("failed test for bug #48576", version, tag.getRegionVersion());
  }

  @Test
  public void testRecordVersionDuringRegionInit() {
    LocalRegion mockRegion = mock(LocalRegion.class);
    when(mockRegion.isInitialized()).thenReturn(false);
    final String local = getIPLiteral();
    InternalDistributedMember ownerId = new InternalDistributedMember(local, 101);
    VMVersionTag tag = new VMVersionTag();
    tag.setRegionVersion(1L);

    RegionVersionVector rvv = createRegionVersionVector(ownerId, mockRegion);
    rvv.recordVersion(ownerId, tag);
    assertEquals(1, rvv.getVersionForMember(ownerId));
  }

  @Test
  public void recordVersionIntoLocalMemberShouldFailIfRegionIsPersistent() {
    LocalRegion mockRegion = mock(LocalRegion.class);
    when(mockRegion.isInitialized()).thenReturn(true);
    when(mockRegion.getDataPolicy()).thenReturn(DataPolicy.PERSISTENT_REPLICATE);
    final String local = getIPLiteral();
    DiskStoreID ownerId = new DiskStoreID();

    DiskRegionVersionVector rvv = new DiskRegionVersionVector(ownerId, mockRegion);

    DiskVersionTag tag = new DiskVersionTag();
    tag.setRegionVersion(1L);
    tag.setMemberID(ownerId);

    expectedException.expect(InternalGemFireError.class);
    rvv.recordVersion(ownerId, tag);
  }

  @Test
  public void recordVersionIntoLocalMemberShouldPassfRegionIsNonPersistent() {
    LocalRegion mockRegion = mock(LocalRegion.class);
    when(mockRegion.isInitialized()).thenReturn(true);
    when(mockRegion.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
    final String local = getIPLiteral();
    InternalDistributedMember ownerId = new InternalDistributedMember(local, 101);
    RegionVersionVector rvv = createRegionVersionVector(ownerId, mockRegion);

    VMVersionTag tag = new VMVersionTag();
    tag.setRegionVersion(1);
    tag.setMemberID(ownerId);

    rvv.recordVersion(ownerId, tag);
    assertEquals(1, rvv.getLocalExceptions().version);
    assertEquals(2, rvv.getNextVersion());
  }

  @Test
  public void usesNewVersionIfGreaterThanOldVersion() throws Exception {
    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
    long oldVersion = 1;
    long newVersion = 2;

    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
    rvv.updateLocalVersion(newVersion);
    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
  }

  @Test
  public void usesOldVersionIfGreaterThanNewVersion() throws Exception {
    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
    long oldVersion = 2;
    long newVersion = 1;

    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
    rvv.updateLocalVersion(newVersion);
    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
  }

  @Test
  public void doesNothingIfVersionsAreSame() throws Exception {
    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
    long oldVersion = 2;
    long sameVersion = 2;

    RegionVersionVector rvv = new TestableRegionVersionVector(ownerId, oldVersion);
    rvv.updateLocalVersion(sameVersion);
    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(oldVersion);
  }

  @Test
  public void doesNotHangIfOtherThreadChangedVersion() throws Exception {
    VersionSource<InternalDistributedMember> ownerId = mock(VersionSource.class);
    long oldVersion = 1;
    long newVersion = 2;

    RegionVersionVector rvv = new VersionRaceConditionRegionVersionVector(ownerId, oldVersion);
    Future<Void> result = executorServiceRule.runAsync(() -> rvv.updateLocalVersion(newVersion));

    assertThatCode(() -> result.get(2, SECONDS)).doesNotThrowAnyException();
    assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsTrueIfRequesterRvvForLostMemberDominates()
      throws Exception {
    InternalDistributedMember lostMember = mock(InternalDistributedMember.class);
    ConcurrentHashMap<InternalDistributedMember, Long> memberToGcVersion =
        new ConcurrentHashMap<>();
    memberToGcVersion.put(lostMember, new Long(1) /* lostMemberGcVersion */);
    RegionVersionVector providerRvv = new VMRegionVersionVector(lostMember, null,
        0, memberToGcVersion, 0, true, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(lostMember);
    regionVersionHolder.recordVersion(1);
    regionVersionHolder.recordVersion(2);
    memberToRegionVersionHolder.put(lostMember, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(lostMember, memberToRegionVersionHolder,
            0, null, 0, true, null);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isTrue();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsFalseIfRequesterRvvForLostMemberDominates()
      throws Exception {
    InternalDistributedMember lostMember = mock(InternalDistributedMember.class);
    ConcurrentHashMap<InternalDistributedMember, Long> memberToGcVersion =
        new ConcurrentHashMap<>();
    memberToGcVersion.put(lostMember, new Long(1) /* lostMemberGcVersion */);
    RegionVersionVector providerRvv = new VMRegionVersionVector(lostMember, null,
        0, memberToGcVersion, 0, true, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(lostMember);
    memberToRegionVersionHolder.put(lostMember, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(lostMember, memberToRegionVersionHolder,
            0, null, 0, true, null);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isFalse();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsFalseIfProviderRvvIsNotPresent()
      throws Exception {
    final String local = getIPLiteral();
    InternalDistributedMember provider = new InternalDistributedMember(local, 101);
    InternalDistributedMember requester = new InternalDistributedMember(local, 102);

    RegionVersionVector providerRvv = new VMRegionVersionVector(provider, null,
        1, null, 1, false, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(provider);
    // memberToRegionVersionHolder.put(provider, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(requester, memberToRegionVersionHolder,
            0, null, 0, false, null);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isFalse();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsFalseIfRequesterRvvDominatesProvider()
      throws Exception {
    final String local = getIPLiteral();
    InternalDistributedMember provider = new InternalDistributedMember(local, 101);
    InternalDistributedMember requester = new InternalDistributedMember(local, 102);

    RegionVersionVector providerRvv = new VMRegionVersionVector(provider, null,
        1, null, 1, false, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(provider);
    memberToRegionVersionHolder.put(provider, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(requester, memberToRegionVersionHolder,
            0, null, 0, false, null);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isFalse();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsTrueIfRequesterRvvDominatesWithNoGcVersion()
      throws Exception {
    final String local = getIPLiteral();
    InternalDistributedMember provider = new InternalDistributedMember(local, 101);
    InternalDistributedMember requester = new InternalDistributedMember(local, 102);

    ConcurrentHashMap<InternalDistributedMember, Long> memberToGcVersion =
        new ConcurrentHashMap<>();
    RegionVersionVector providerRvv = new VMRegionVersionVector(provider, null,
        1, memberToGcVersion, 1, false, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(provider);
    regionVersionHolder.recordVersion(1);
    regionVersionHolder.recordVersion(2);
    memberToRegionVersionHolder.put(provider, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(requester, memberToRegionVersionHolder,
            0, null, 0, false, null);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isTrue();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsTrueIfRequesterRvvDominates() throws Exception {
    final String local = getIPLiteral();
    InternalDistributedMember provider = new InternalDistributedMember(local, 101);
    InternalDistributedMember requester = new InternalDistributedMember(local, 102);
    ConcurrentHashMap<InternalDistributedMember, Long> memberToGcVersion =
        new ConcurrentHashMap<>();
    memberToGcVersion.put(requester, new Long(1));
    RegionVersionVector providerRvv = new VMRegionVersionVector(provider, null,
        1, memberToGcVersion, 1, false, null);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(provider);
    regionVersionHolder.recordVersion(1);
    regionVersionHolder.recordVersion(2);
    memberToRegionVersionHolder.put(provider, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(requester, memberToRegionVersionHolder,
            2, null, 0, false, regionVersionHolder);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isTrue();
  }

  @Test
  public void isRvvGcDominatedByRequesterRvvReturnsFalseIfRequesterRvvDominates() throws Exception {
    final String local = getIPLiteral();
    InternalDistributedMember provider = new InternalDistributedMember(local, 101);
    InternalDistributedMember requester = new InternalDistributedMember(local, 102);
    ConcurrentHashMap<InternalDistributedMember, Long> memberToGcVersion =
        new ConcurrentHashMap<>();
    memberToGcVersion.put(requester, new Long(3));
    RegionVersionHolder pRegionVersionHolder = new RegionVersionHolder(provider);
    pRegionVersionHolder.recordVersion(1);
    pRegionVersionHolder.recordVersion(2);
    pRegionVersionHolder.recordVersion(3);
    pRegionVersionHolder.recordVersion(4);

    RegionVersionVector providerRvv = new VMRegionVersionVector(provider, null,
        1, memberToGcVersion, 1, false, pRegionVersionHolder);

    ConcurrentHashMap<InternalDistributedMember, RegionVersionHolder<InternalDistributedMember>> memberToRegionVersionHolder =
        new ConcurrentHashMap<>();
    RegionVersionHolder regionVersionHolder = new RegionVersionHolder(provider);
    regionVersionHolder.recordVersion(1);
    regionVersionHolder.recordVersion(2);
    memberToRegionVersionHolder.put(provider, regionVersionHolder);
    RegionVersionVector requesterRvv =
        new VMRegionVersionVector(requester, memberToRegionVersionHolder,
            2, null, 0, false, regionVersionHolder);

    assertThat(providerRvv.isRVVGCDominatedBy(requesterRvv)).isFalse();
  }

  private RegionVersionVector createRegionVersionVector(InternalDistributedMember ownerId,
      LocalRegion owner) {
    @SuppressWarnings({"unchecked", "rawtypes"})
    RegionVersionVector rvv = new RegionVersionVector(ownerId, owner) {
      @Override
      public int getDSFID() {
        return 0;
      }

      @Override
      protected RegionVersionVector createCopy(VersionSource ownerId, ConcurrentHashMap vector,
          long version, ConcurrentHashMap gcVersions, long gcVersion, boolean singleMember,
          RegionVersionHolder clonedLocalHolder) {
        return null;
      }

      @Override
      protected void writeMember(VersionSource member, DataOutput out) throws IOException {}

      @Override
      protected VersionSource readMember(DataInput in) throws IOException, ClassNotFoundException {
        return null;
      }
    };
    return rvv;
  }

  private void doExceptionsWithContains(DiskStoreID id, DiskRegionVersionVector rvv) {
    rvv.recordVersion(id, 10);

    // Make sure we have exceptions from 0-10
    assertFalse(rvv.contains(id, 5));

    rvv.recordVersion(id, 5);
    assertTrue(rvv.contains(id, 5));
    assertFalse(rvv.contains(id, 4));
    assertFalse(rvv.contains(id, 6));

    for (int i = 0; i < 10; i++) {
      rvv.recordVersion(id, i);
    }

    for (int i = 0; i < 10; i++) {
      assertTrue(rvv.contains(id, i));
    }
    assertEquals(0, rvv.getExceptionCount(id));
  }

  private static class TestableRegionVersionVector
      extends RegionVersionVector<VersionSource<InternalDistributedMember>> {

    TestableRegionVersionVector(VersionSource<InternalDistributedMember> ownerId, long version) {
      super(ownerId, null, version);
    }

    @Override
    protected RegionVersionVector createCopy(VersionSource ownerId, ConcurrentHashMap vector,
        long version, ConcurrentHashMap gcVersions, long gcVersion, boolean singleMember,
        RegionVersionHolder clonedLocalHolder) {
      return null;
    }

    @Override
    protected VersionSource<InternalDistributedMember> readMember(DataInput in)
        throws IOException, ClassNotFoundException {
      return null;
    }

    @Override
    protected void writeMember(VersionSource member, DataOutput out) throws IOException {

    }

    @Override
    public int getDSFID() {
      return 0;
    }
  }

  private static String getIPLiteral() {
    try {
      return SocketCreator.getLocalHost().getHostAddress();
    } catch (UnknownHostException e) {
      throw new Error("Problem determining host IP address", e);
    }
  }

  private static class VersionRaceConditionRegionVersionVector extends TestableRegionVersionVector {

    VersionRaceConditionRegionVersionVector(VersionSource<InternalDistributedMember> ownerId,
        long version) {
      super(ownerId, version);
    }

    @Override
    boolean compareAndSetVersion(long currentVersion, long newVersion) {
      super.compareAndSetVersion(currentVersion, newVersion);
      return false;
    }

  }
}
