/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.disttx;

import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import org.junit.Ignore;
import org.junit.Test;

import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.CommitIncompleteException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.DistTXState;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateInterface;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.execute.CustomerIDPartitionResolver;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.util.internal.GeodeGlossary;

@SuppressWarnings("deprecation")

public class DistributedTransactionDUnitTest extends JUnit4CacheTestCase {

  protected final String CUSTOMER_PR = "customerPRRegion";
  protected final String ORDER_PR = "orderPRRegion";
  protected final String D_REFERENCE = "distrReference";
  protected final String PERSISTENT_CUSTOMER_PR = "persistentCustomerPRRegion";

  protected final String CUSTOMER_RR = "customerRRRegion";

  @Override
  public final void postSetUp() throws Exception {
    Invoke.invokeInEveryVM(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "sync-commits", "true");
        return null;
      }
    });

    Invoke.invokeInEveryVM(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        // System.setProperty("gemfire.ALLOW_PERSISTENT_TRANSACTIONS", "true");
        TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
        return null;
      }
    });
  }

  @Override
  public final void preTearDownCacheTestCase() throws Exception {
    Invoke.invokeInEveryVM(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "sync-commits", "false");
        return null;
      }
    });
    Invoke.invokeInEveryVM(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        // System.setProperty("gemfire.ALLOW_PERSISTENT_TRANSACTIONS", "false");
        TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
        return null;
      }
    });
  }

  public DistributedTransactionDUnitTest() {
    super();
  }

  public Object execute(VM vm, SerializableCallable c) {
    return vm.invoke(c);
  }

  public void execute(VM[] vms, SerializableCallable c) {
    for (VM vm : vms) {
      execute(vm, c);
    }
  }

  public int startServer(VM vm) {
    return (Integer) vm.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        int port = getRandomAvailableTCPPort();
        CacheServer s = getCache().addCacheServer();
        s.setPort(port);
        s.start();
        TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
        txMgr.setTransactionTimeToLiveForTest(10);
        return port;
      }
    });
  }

  protected boolean getConcurrencyChecksEnabled() {
    return true;
  }

  void createRR(boolean isEmpty) {
    AttributesFactory af = new AttributesFactory();
    af.setScope(Scope.DISTRIBUTED_ACK);
    if (!isEmpty) {
      af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
    } else {
      af.setDataPolicy(DataPolicy.EMPTY); // for accessor
    }
    af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
    getCache().createRegion(CUSTOMER_RR, af.create());
  }

  void createPR(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) {
    AttributesFactory af = new AttributesFactory();
    af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
    if (interestPolicy != null) {
      af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy));
    }
    af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
        .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1)
        .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
        .setRedundantCopies(redundantCopies).create());
    getCache().createRegion(CUSTOMER_PR, af.create());

  }

  @Override
  public Properties getDistributedSystemProperties() {
    Properties props = super.getDistributedSystemProperties();
    return props;
  }


  void createRegions(boolean accessor, int redundantCopies, InterestPolicy interestPolicy) {
    AttributesFactory af = new AttributesFactory();
    af.setScope(Scope.DISTRIBUTED_ACK);
    af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
    // af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
    getCache().createRegion(D_REFERENCE, af.create());

    af = new AttributesFactory();
    // af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
    af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
    if (interestPolicy != null) {
      af.setSubscriptionAttributes(new SubscriptionAttributes(interestPolicy));
    }
    af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
        .setTotalNumBuckets(4).setLocalMaxMemory(accessor ? 0 : 1)
        .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
        .setRedundantCopies(redundantCopies).create());
    getCache().createRegion(CUSTOMER_PR, af.create());
    af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
    af.setPartitionAttributes(new PartitionAttributesFactory<OrderId, Order>().setTotalNumBuckets(4)
        .setLocalMaxMemory(accessor ? 0 : 1)
        .setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
        .setRedundantCopies(redundantCopies).setColocatedWith(CUSTOMER_PR).create());
    getCache().createRegion(ORDER_PR, af.create());
  }

  public void createRegions(VM[] vms) {
    for (VM vm : vms) {
      vm.invoke(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          createRegions(false, 1, null);
          return null;
        }

      });
    }
  }

  public void createRegions(final VM vm, final boolean accessor, final int redundantCopies) {
    vm.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        createRegions(accessor, redundantCopies, null);
        return null;
      }
    });
  }

  public void createRR(VM[] vms) {
    for (VM vm : vms) {
      vm.invoke(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          createRR(false);
          return null;
        }
      });
    }
  }

  public void createRRonAccessor(VM accessor) {
    accessor.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        createRR(true);
        return null;
      }
    });
  }

  public void createPR(VM[] vms) {
    for (VM vm : vms) {
      vm.invoke(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          createPR(false, 0, null);
          return null;
        }
      });
    }
  }

  public void createPRwithRedundanyCopies(VM[] vms, final int redundency) {
    for (VM vm : vms) {
      vm.invoke(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          createPR(false, redundency, null);
          return null;
        }
      });
    }
  }

  public void createPRonAccessor(VM accessor, final int redundency) {
    accessor.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        createPR(true, redundency, null);
        return null;
      }
    });
  }

  public void createPersistentPR(VM[] vms) {
    for (VM vm : vms) {
      vm.invoke(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          createPersistentPR();
          return null;
        }
      });
    }
  }

  public void createPersistentPR() {
    getCache().createRegion(PERSISTENT_CUSTOMER_PR,
        getPersistentPRAttributes(1, -1, getCache(), 113, true));
  }

  protected RegionAttributes getPersistentPRAttributes(final int redundancy,
      final int recoveryDelay, Cache cache, int numBuckets, boolean synchronous) {
    DiskStore ds = cache.findDiskStore("disk");
    if (ds == null) {
      ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
    }
    AttributesFactory af = new AttributesFactory();
    PartitionAttributesFactory paf = new PartitionAttributesFactory();
    paf.setRedundantCopies(redundancy);
    paf.setRecoveryDelay(recoveryDelay);
    paf.setTotalNumBuckets(numBuckets);
    paf.setLocalMaxMemory(500);
    af.setPartitionAttributes(paf.create());
    af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
    af.setDiskStoreName("disk");
    af.setDiskSynchronous(synchronous);
    RegionAttributes attr = af.create();
    return attr;
  }


  void populateData() {
    Region custRegion = getCache().getRegion(CUSTOMER_PR);
    Region orderRegion = getCache().getRegion(ORDER_PR);
    Region refRegion = getCache().getRegion(D_REFERENCE);
    for (int i = 0; i < 5; i++) {
      CustId custId = new CustId(i);
      Customer customer = new Customer("customer" + i, "address" + i);
      OrderId orderId = new OrderId(i, custId);
      Order order = new Order("order" + i);
      custRegion.put(custId, customer);
      orderRegion.put(orderId, order);
      refRegion.put(custId, customer);
    }
  }

  void populateRR() {
    Region custRegion = getCache().getRegion(CUSTOMER_RR);
    for (int i = 0; i < 5; i++) {
      CustId custId = new CustId(i);
      Customer customer = new Customer("customer" + i, "address" + i);
      custRegion.put(custId, customer);
    }
  }

  public void populateData(final VM vm) {
    vm.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        populateData();
        return null;
      }
    });
  }

  public void populateRR(final VM vm) {
    vm.invoke(new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        populateRR();
        return null;
      }
    });
  }

  @Test
  public void testTransactionalPutOnReplicatedRegion() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createRR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        mgr.commit();
        mgr.begin();

        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER_RR);
        CustId custId = new CustId(1);
        Customer expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Perform a put
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        custRegion.put(custIdOne, customerOne);

        // Rollback the transaction
        mgr.rollback();


        mgr.begin();
        // Verify that the entry is rolled back
        expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Perform two more puts and a commit
        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        CustId custIdThree = new CustId(3);
        Customer customerThree = new Customer("name3", "addr3");
        custRegion.put(custIdTwo, customerTwo);
        custRegion.put(custIdThree, customerThree);

        mgr.commit();
        mgr.begin();

        // Verify data
        assertEquals(2, custRegion.size());
        assertTrue(custRegion.containsKey(custIdTwo));
        assertTrue(custRegion.containsKey(custIdThree));
        assertEquals(customerTwo, custRegion.get(custIdTwo));
        assertEquals(customerThree, custRegion.get(custIdThree));

        // Perform one more put but don't commit
        custRegion.put(custIdOne, customerOne);

        // Verify data
        assertEquals(3, custRegion.size());
        assertTrue(custRegion.containsKey(custIdOne));
        assertEquals(customerOne, custRegion.get(custIdOne));

        mgr.commit();

        return null;
      }

    });
  }

  @Test
  public void testTransactionalPutOnPartitionedRegion() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createPR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {

      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        LogWriter logger = getGemfireCache().getLogger();

        mgr.begin();
        logger.fine("TEST:Commit-1");
        mgr.commit();

        mgr.begin();
        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER_PR);
        CustId custId = new CustId(1);
        Customer expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Perform a put
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        logger.fine("TEST:Put-1");
        custRegion.put(custIdOne, customerOne);

        // Rollback the transaction
        logger.fine("TEST:Rollback-1");
        mgr.rollback();

        mgr.begin();
        // Verify that the entry is rolled back
        expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Perform two more puts and a commit
        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        CustId custIdThree = new CustId(3);
        Customer customerThree = new Customer("name3", "addr3");
        logger.fine("TEST:Put-2");
        custRegion.put(custIdTwo, customerTwo);
        logger.fine("TEST:Put-3");
        custRegion.put(custIdThree, customerThree);

        logger.fine("TEST:Commit-2");
        mgr.commit();
        mgr.begin();

        // Verify data
        assertEquals(2, custRegion.size());
        assertTrue(custRegion.containsKey(custIdTwo));
        assertTrue(custRegion.containsKey(custIdThree));
        assertEquals(customerTwo, custRegion.get(custIdTwo));
        assertEquals(customerThree, custRegion.get(custIdThree));

        // Perform one more put but don't commit
        logger.fine("TEST:Put-4");
        Customer customerOneMod = new Customer("name1", "addr11");

        custRegion.put(custIdOne, customerOneMod);

        // Verify data
        assertEquals(3, custRegion.size());
        assertTrue(custRegion.containsKey(custIdOne));
        assertEquals(customerOneMod, custRegion.get(custIdOne));
        logger.fine("TEST:Commit-3");
        mgr.commit();

        return null;
      }

    });
  }

  @SuppressWarnings("serial")
  @Test
  public void testCommitOnPartitionedAndReplicatedRegions() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createPR(new VM[] {server1, server2, server3});
    createRR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        Region<CustId, Customer> rrRegion = getCache().getRegion(CUSTOMER_RR);
        Region<CustId, Customer> prRegion = getCache().getRegion(CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        prRegion.put(custIdOne, customerOne);
        rrRegion.put(custIdOne, customerOne);

        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        rrRegion.put(custIdTwo, customerTwo);

        mgr.commit();

        // Verify
        assertEquals(2, rrRegion.size());
        assertTrue(rrRegion.containsKey(custIdOne));
        assertTrue(rrRegion.containsKey(custIdTwo));
        assertEquals(customerOne, rrRegion.get(custIdOne));
        assertEquals(customerTwo, rrRegion.get(custIdTwo));

        assertEquals(1, prRegion.size());
        assertTrue(prRegion.containsKey(custIdOne));
        assertEquals(customerOne, rrRegion.get(custIdOne));

        return null;
      }

    });
  }

  @Test
  public void testGetIsolated() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createPR(new VM[] {server1, server2, server3});
    createRR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        mgr.commit();
        mgr.begin();
        Region<CustId, Customer> custPR = getCache().getRegion(CUSTOMER_PR);
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        custPR.put(custIdOne, customerOne);

        Region<CustId, Customer> replicatedRegion = getCache().getRegion(CUSTOMER_RR);
        replicatedRegion.put(custIdOne, customerOne);
        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        replicatedRegion.put(custIdTwo, customerTwo);

        // Verify before commit
        assertEquals(2, replicatedRegion.size());
        assertEquals(customerOne, replicatedRegion.get(custIdOne));
        assertEquals(customerTwo, replicatedRegion.get(custIdTwo));

        // Perform commit
        mgr.commit();

        // Verify after commit
        assertEquals(2, replicatedRegion.size());
        assertEquals(customerOne, replicatedRegion.get(custIdOne));
        assertEquals(customerTwo, replicatedRegion.get(custIdTwo));

        return null;
      }
    });
  }

  @Test
  public void testCommitAndRollback() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createPR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        // mgr.begin();
        Region<CustId, Customer> custPR = getCache().getRegion(CUSTOMER_PR);
        for (int i = 0; i < 1000; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("name" + i, "addr" + i);
          mgr.begin();
          custPR.put(custId, customer);
          if (i % 2 == 0) {
            mgr.commit();
          } else {
            mgr.rollback();
          }
        }
        // Verify number of puts
        assertEquals(500, custPR.size());
        Set<Region.Entry<?, ?>> entries = custPR.entrySet(false);
        assertEquals(500, entries.size());

        return null;
      }
    });
  }

  /*
   * We create 2 partitioned regions one on each server and have a third node as accessor and fire
   * transactional operations on it.
   */
  @Test
  public void testNonColocatedPutByPartitioning() {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0); // datastore
    VM server2 = host.getVM(1); // datastore
    VM server3 = host.getVM(2); // accessor

    final String CUSTOMER_PR1 = "CUSTOMER_PR1";
    final String CUSTOMER_PR2 = "CUSTOMER_PR2";

    // Create CUSTOMER_PR1 on server1
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(1)
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR1, af.create());
        return null;
      }
    });

    // Create CUSTOMER_PR2 on server2
    execute(server2, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(1)
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR2, af.create());
        return null;
      }
    });

    // Create both the regions on server3 (accessor)
    execute(server3, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(0) // since this is an accessor
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR1, af.create());

        return null;
      }
    });
    execute(server3, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(0) // since this is an accessor
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR2, af.create());
        return null;
      }
    });

    // Now perform tx ops on accessor
    execute(server3, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();

        Region<CustId, Customer> custPR1 = getCache().getRegion(CUSTOMER_PR1);
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        custPR1.put(custIdOne, customerOne);

        Region<CustId, Customer> custPR2 = getCache().getRegion(CUSTOMER_PR2);
        custPR2.put(custIdOne, customerOne);

        mgr.commit();

        // Verify
        assertEquals(1, custPR1.size());
        assertEquals(1, custPR2.size());
        return null;
      }
    });

    // Verify on one of the servers
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> custPR1 = getCache().getRegion(CUSTOMER_PR1);
        assertEquals(1, custPR1.size());
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        assertEquals(customerOne, custPR1.get(custIdOne));
        return null;
      }
    });
  }

  @Test
  public void testTransactionalKeyBasedUpdates() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createPR(new VM[] {server1, server2, server3});

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        // mgr.begin();
        LogWriter logger = getGemfireCache().getLogger();

        Region<CustId, Customer> custPR = getCache().getRegion(CUSTOMER_PR);
        for (int i = 1; i <= 2; i++) {
          mgr.begin();
          logger.fine("TEST:PUT-" + i);
          custPR.put(new CustId(i), new Customer("name" + i, "addr" + i));
          logger.fine("TEST:COMMIT-" + i);
          mgr.commit();
        }

        // Updates
        for (int i = 1; i <= 2; i++) {
          CustId custId = new CustId(i);
          Customer customer = custPR.get(custId);
          assertNotNull(customer);
          mgr.begin();
          logger.fine("TEST:UPDATE-" + i);
          custPR.put(custId, new Customer("name" + i * 2, "addr" + i * 2));
          logger.fine("TEST:UPDATED-" + i + "=" + custId + "," + custPR.get(custId));
          logger.fine("TEST:UPDATE COMMIT-" + i);
          mgr.commit();
          logger.fine("TEST:POSTCOMMIT-" + i + "=" + custId + "," + custPR.get(custId));
        }
        // Verify
        for (int i = 1; i <= 2; i++) {
          CustId custId = new CustId(i);
          Customer customer = custPR.get(custId);
          assertNotNull(customer);
          logger.fine("TEST:VERIFYING-" + i);
          assertEquals(new Customer("name" + i * 2, "addr" + i * 2), customer);
        }

        return null;
      }
    });
  }

  @Test
  public void testTransactionalKeyBasedDestroys_PR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createPR(new VM[] {server1, server2, server3});

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        // mgr.begin();

        Region<CustId, Customer> custPR = getCache().getRegion(CUSTOMER_PR);
        for (int i = 1; i <= 1000; i++) {
          mgr.begin();
          custPR.put(new CustId(i), new Customer("name" + i, "addr" + i));
          mgr.commit();
        }

        // Destroys
        for (int i = 1; i <= 100; i++) {
          CustId custId = new CustId(i);
          mgr.begin();
          Object customerRemoved = custPR.remove(custId);
          // Removing this assertion since in case of distributed destroys the
          // value will not be returned.
          // assertNotNull(customerRemoved);
          mgr.commit();
        }

        // Verify
        for (int i = 1; i <= 100; i++) {
          CustId custId = new CustId(1);
          Customer customer = custPR.get(custId);
          assertNull(customer);
        }

        assertEquals(900, custPR.size());
        return null;
      }
    });
  }

  @Test
  public void testTransactionalKeyBasedDestroys_RR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    createRR(new VM[] {server1, server2, server3});

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        // mgr.begin();

        Region<CustId, Customer> custRR = getCache().getRegion(CUSTOMER_RR);
        for (int i = 1; i <= 1000; i++) {
          mgr.begin();
          custRR.put(new CustId(i), new Customer("name" + i, "addr" + i));
          mgr.commit();
        }

        // Destroys
        for (int i = 1; i <= 100; i++) {
          CustId custId = new CustId(i);
          mgr.begin();
          Object customerRemoved = custRR.remove(custId);
          assertNotNull(customerRemoved);
          mgr.commit();
        }

        // Verify
        for (int i = 1; i <= 100; i++) {
          CustId custId = new CustId(1);
          Customer customer = custRR.get(custId);
          assertNull(customer);
        }

        assertEquals(900, custRR.size());
        return null;
      }
    });
  }

  @Test
  public void testTransactionalUpdates() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createPR(new VM[] {server2, server3});
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        createPR(false, 0, null);

        Region<CustId, Customer> custPR = getCache().getRegion(CUSTOMER_PR);
        for (int i = 1; i <= 200; i++) {
          custPR.put(new CustId(i), new Customer("name" + i, "addr" + i));
        }

        assertEquals(200, custPR.size());

        mgr.rollback();
        // mgr.commit();
        mgr.begin();
        assertEquals(0, custPR.size());

        mgr.commit();
        mgr.begin();

        for (int i = 1; i <= 200; i++) {
          custPR.put(new CustId(i), new Customer("name" + i, "addr" + i));
        }

        mgr.commit();
        // mgr.begin();
        for (int i = 1; i <= 200; i++) {
          mgr.begin();
          custPR.put(new CustId(i), new Customer("name" + i * 2, "addr" + i * 2));
          mgr.commit();
        }
        mgr.begin();
        mgr.rollback();

        assertEquals(200, custPR.size());

        for (int i = 1; i <= 200; i++) {
          assertEquals(new Customer("name" + i * 2, "addr" + i * 2), custPR.get(new CustId(i)));
        }

        return null;
      }

    });
  }

  @Test
  public void testPutAllWithTransactions() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createRegions(new VM[] {server1, server2, server3});

    execute(new VM[] {server1, server2, server3}, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(1)
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
            .setRedundantCopies(0).create());
        getCache().createRegion("NONCOLOCATED_PR", af.create());
        return null;
      }
    });

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region custRegion = getCache().getRegion(CUSTOMER_PR);
        Region orderRegion = getCache().getRegion(ORDER_PR);

        Map custMap = new HashMap();
        Map orderMap = new HashMap();
        for (int i = 0; i < 5; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          orderMap.put(orderId, order);
        }

        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        custRegion.putAll(custMap);
        orderRegion.putAll(orderMap);
        mgr.commit();

        mgr.begin();
        assertEquals(5, custRegion.size());
        assertEquals(5, orderRegion.size());

        custMap = new HashMap();
        orderMap = new HashMap();
        for (int i = 5; i < 10; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          orderMap.put(orderId, order);
        }
        custRegion.putAll(custMap);
        orderRegion.putAll(orderMap);
        mgr.rollback();

        mgr.begin();
        assertEquals(5, custRegion.size());
        assertEquals(5, orderRegion.size());

        custRegion.putAll(custMap);
        orderRegion.putAll(orderMap);

        assertEquals(10, custRegion.size());
        assertEquals(10, orderRegion.size());

        // Verify operations involving non colocated PR
        Map map = new HashMap();
        custMap.clear();
        for (int i = 10; i < 15; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          map.put(custId, customer);
        }
        custRegion.putAll(custMap);
        Region nonColocatedRegion = getCache().getRegion("NONCOLOCATED_PR");
        nonColocatedRegion.putAll(orderMap);


        mgr.commit();
        mgr.begin();
        assertEquals(15, custRegion.size());
        assertEquals(5, nonColocatedRegion.size());

        custMap.clear();
        map.clear();
        for (int i = 15; i < 20; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          map.put(custId, customer);
        }
        custRegion.putAll(custMap);
        nonColocatedRegion.putAll(orderMap);

        mgr.rollback();
        assertEquals(15, custRegion.size());
        assertEquals(5, nonColocatedRegion.size());

        return null;
      }
    });
  }

  @Test
  public void testRemoveAllWithTransactions() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createRegions(new VM[] {server1, server2, server3});

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region custRegion = getCache().getRegion(CUSTOMER_PR);
        Region orderRegion = getCache().getRegion(ORDER_PR);

        Map custMap = new HashMap();
        Map orderMap = new HashMap();
        for (int i = 0; i < 15; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          orderMap.put(orderId, order);
        }

        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        custRegion.putAll(custMap);
        orderRegion.putAll(orderMap);
        mgr.commit();

        mgr.begin();
        assertEquals(15, custRegion.size());
        assertEquals(15, orderRegion.size());

        custMap = new HashMap();
        orderMap = new HashMap();
        for (int i = 5; i < 10; i++) {
          CustId custId = new CustId(i);
          Customer customer = new Customer("customer" + i, "address" + i);
          OrderId orderId = new OrderId(i, custId);
          Order order = new Order("order" + i);
          custMap.put(custId, customer);
          orderMap.put(orderId, order);
        }
        custRegion.removeAll(custMap.keySet());
        orderRegion.removeAll(orderMap.keySet());
        mgr.rollback();

        mgr.begin();
        assertEquals(15, custRegion.size());
        assertEquals(15, orderRegion.size());

        custRegion.removeAll(custMap.keySet());
        orderRegion.removeAll(orderMap.keySet());

        assertEquals(10, custRegion.size());
        assertEquals(10, orderRegion.size());
        mgr.commit();

        assertEquals(10, custRegion.size());
        assertEquals(10, orderRegion.size());

        return null;
      }
    });
  }

  @Test
  public void testTxWithSingleDataStore() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0); // datastore

    final String CUSTOMER_PR1 = "CUSTOMER_PR1";
    final String CUSTOMER_PR2 = "CUSTOMER_PR2";

    // Create CUSTOMER_PR1 on server1
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(1)
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver1"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR1, af.create());
        return null;
      }
    });

    // Create CUSTOMER_PR2 on server2
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        AttributesFactory af = new AttributesFactory();
        af.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
        af.setPartitionAttributes(new PartitionAttributesFactory<CustId, Customer>()
            .setTotalNumBuckets(4).setLocalMaxMemory(1)
            .setPartitionResolver(new CustomerIDPartitionResolver("resolver2"))
            .setRedundantCopies(0).create());
        getCache().createRegion(CUSTOMER_PR2, af.create());
        return null;
      }
    });

    // Now perform tx ops on accessor
    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();

        Region<CustId, Customer> custPR1 = getCache().getRegion(CUSTOMER_PR1);
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        custPR1.put(custIdOne, customerOne);

        Region<CustId, Customer> custPR2 = getCache().getRegion(CUSTOMER_PR2);
        custPR2.put(custIdOne, customerOne);

        mgr.commit();

        // Verify
        assertEquals(1, custPR1.size());
        assertEquals(1, custPR2.size());
        return null;
      }
    });

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> custPR1 = getCache().getRegion(CUSTOMER_PR1);
        assertEquals(1, custPR1.size());
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        assertEquals(customerOne, custPR1.get(custIdOne));
        return null;
      }
    });
  }

  @Test
  public void testMultipleOpsOnSameKeyInTx() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createPR(new VM[] {server1, server2, server3});
    execute(server1, new SerializableCallable() {

      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        mgr.commit();

        mgr.begin();
        Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER_PR);
        CustId custId = new CustId(1);
        Customer expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Perform a put
        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        custRegion.put(custIdOne, customerOne);

        // Rollback the transaction
        mgr.rollback();

        mgr.begin();
        // Verify that the entry is rolled back
        expectedCustomer = custRegion.get(custId);
        assertNull(expectedCustomer);

        // Add more data
        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        CustId custIdThree = new CustId(3);
        Customer customerThree = new Customer("name3", "addr3");
        custRegion.put(custIdTwo, customerTwo);
        custRegion.put(custIdThree, customerThree);
        mgr.commit();

        mgr.begin();
        // Verify data
        assertEquals(2, custRegion.size());
        assertTrue(custRegion.containsKey(custIdTwo));
        assertTrue(custRegion.containsKey(custIdThree));
        assertEquals(customerTwo, custRegion.get(custIdTwo));
        assertEquals(customerThree, custRegion.get(custIdThree));

        // Update the values for the same keys multiple times
        custRegion.put(custIdOne, new Customer("name1_mod1", "addr1_mod1"));
        custRegion.put(custIdTwo, new Customer("name2_mod1", "addr2_mod1"));
        custRegion.put(custIdOne, new Customer("name1_mod2", "addr1_mod2"));
        custRegion.put(custIdOne, new Customer("name1_mod3", "addr1_mod3"));
        custRegion.put(custIdTwo, new Customer("name2_mod2", "addr2_mod2"));

        assertEquals(3, custRegion.size());
        mgr.commit();

        assertEquals(3, custRegion.size());
        Customer c = custRegion.get(custIdOne);
        return null;
      }
    });

  }


  /*
   * Test to reproduce a scenario where: 1. On primary, the tx op is applied first followed by
   * non-tx 2. On secondary, non-tx op is applied first followed by tx.
   */
  @Ignore
  @Test
  public void testConcurrentTXAndNonTXOperations() throws Exception {
    Host host = Host.getHost(0);
    final VM server1 = host.getVM(0);
    final VM server2 = host.getVM(1);

    createPersistentPR(new VM[] {server1});

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        prRegion.put(custIdOne, customerOne);

        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custIdOne);

        String primaryMember = br.getBucketAdvisor().getPrimary().toString();
        getGemfireCache().getLogger().fine("TEST:PRIMARY:" + primaryMember);

        String memberId = getGemfireCache().getDistributedSystem().getMemberId();
        getGemfireCache().getLogger().fine("TEST:MEMBERID:" + memberId);

        return null;
      }
    });

    createPersistentPR(new VM[] {server2});

    Boolean isPrimary = (Boolean) execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);
        CustId custIdOne = new CustId(1);
        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custIdOne);

        String primaryMember = br.getBucketAdvisor().getPrimary().toString();
        getGemfireCache().getLogger().fine("TEST:PRIMARY:" + primaryMember);

        String memberId = getGemfireCache().getDistributedSystem().getMemberId();
        getGemfireCache().getLogger().fine("TEST:MEMBERID:" + memberId);

        return memberId.equals(primaryMember);
      }
    });

    final VM primary = isPrimary.booleanValue() ? server1 : server2;
    final VM secondary = !isPrimary.booleanValue() ? server1 : server2;

    System.out.println("TEST:SERVER-1:VM-" + server1.getId());
    System.out.println("TEST:SERVER-2:VM-" + server2.getId());
    System.out.println("TEST:PRIMARY=VM-" + primary.getId());
    System.out.println("TEST:SECONDARY=VM-" + secondary.getId());

    class WaitRelease implements Runnable {
      CountDownLatch cdl;
      String op;

      public WaitRelease(CountDownLatch cdl, String member) {
        this.cdl = cdl;
      }

      @Override
      public void run() {
        try {
          GemFireCacheImpl.getExisting().getLogger().fine("TEST:TX WAITING - " + op);
          cdl.await();
          GemFireCacheImpl.getExisting().getLogger().fine("TEST:TX END WAITING");
        } catch (InterruptedException e) {
        }
      }

      public void release() {
        GemFireCacheImpl.getExisting().getLogger().fine("TEST:TX COUNTDOWN - " + op);
        cdl.countDown();
      }
    }

    // Install TX hook
    SerializableCallable txHook = new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CountDownLatch cdl = new CountDownLatch(1);
        DistTXState.internalBeforeApplyChanges = new WaitRelease(cdl, "TX OP");
        return null;
      }
    };

    execute(secondary, txHook);


    // Install non-TX hook
    SerializableCallable nontxHook = new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CountDownLatch cdl = new CountDownLatch(1);
        DistTXState.internalBeforeNonTXBasicPut = new WaitRelease(cdl, "NON TX OP");
        return null;
      }
    };

    // Install the wait-release hook on the secondary
    execute(secondary, nontxHook);


    // Start a tx operation on primary

    execute(primary, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        // The reason this is run in a separate thread instead of controller thread
        // is that this is going to block because the secondary is going to wait.
        new Thread() {
          @Override
          public void run() {
            CacheTransactionManager mgr = getGemfireCache().getTxManager();
            mgr.setDistributed(true);
            getGemfireCache().getLogger().fine("TEST:DISTTX=" + mgr.isDistributed());
            mgr.begin();
            Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

            CustId custIdOne = new CustId(1);
            Customer customerOne = new Customer("name1_tx", "addr1");
            getGemfireCache().getLogger().fine("TEST:TX UPDATE");
            prRegion.put(custIdOne, customerOne);
            getGemfireCache().getLogger().fine("TEST:TX COMMIT");
            mgr.commit();
          }
        }.start();
        return null;
      }
    });

    // Let the TX op be applied on primary first
    Thread.currentThread().sleep(200);

    // Perform a non-tx op on the same key on primary
    execute(primary, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1_nontx", "addr1");
        getGemfireCache().getLogger().fine("TEST:TX NONTXUPDATE");
        prRegion.put(custIdOne, customerOne);
        return null;
      }
    });


    // Wait for a few milliseconds
    Thread.currentThread().sleep(200);

    // Release the waiting non-tx op first, on secondary
    execute(secondary, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Runnable r = DistTXState.internalBeforeNonTXBasicPut;
        assert (r != null && r instanceof WaitRelease);
        WaitRelease e = (WaitRelease) r;
        e.release();
        return null;
      }
    });

    // Now release the waiting commit on secondary
    execute(secondary, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Runnable r = DistTXState.internalBeforeApplyChanges;
        assert (r != null && r instanceof WaitRelease);
        WaitRelease e = (WaitRelease) r;
        e.release();
        return null;
      }
    });

    // Verify region and entry versions on primary and secondary
    SerializableCallable verifyPrimary = new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custId = new CustId(1);
        Customer customer = prRegion.get(custId);

        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custId);
        RegionEntry re = br.getRegionEntry(custId);

        getGemfireCache().getLogger().fine("TEST:TX PRIMARY CUSTOMER=" + customer);

        getGemfireCache().getLogger()
            .fine("TEST:TX PRIMARY REGION VERSION=" + re.getVersionStamp().getRegionVersion());
        getGemfireCache().getLogger()
            .fine("TEST:TX PRIMARY ENTRY VERSION=" + re.getVersionStamp().getEntryVersion());
        return null;
      }
    };
    execute(primary, verifyPrimary);
    SerializableCallable verifySecondary = new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custId = new CustId(1);
        Customer customer = prRegion.get(custId);

        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custId);
        RegionEntry re = br.getRegionEntry(custId);

        getGemfireCache().getLogger().fine("TEST:TX SECONDARY CUSTOMER=" + customer);

        getGemfireCache().getLogger()
            .fine("TEST:TX SECONDARY REGION VERSION=" + re.getVersionStamp().getRegionVersion());
        getGemfireCache().getLogger()
            .fine("TEST:TX SECONDARY ENTRY VERSION=" + re.getVersionStamp().getEntryVersion());
        return null;
      }
    };

    execute(secondary, verifySecondary);
  }

  @Test
  public void testBasicDistributedTX() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createPersistentPR(new VM[] {server1, server2});
    execute(server2, new SerializableCallable() {
      @Override
      public Object call() throws Exception {

        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        getGemfireCache().getLogger().fine("TEST:DISTTX=" + mgr.isDistributed());
        getGemfireCache().getLogger().fine("TEST:TX BEGIN");
        mgr.begin();
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        getGemfireCache().getLogger().fine("TEST:TX PUT 1");
        prRegion.put(custIdOne, customerOne);

        CustId custIdTwo = new CustId(2);
        Customer customerTwo = new Customer("name2", "addr2");
        getGemfireCache().getLogger().fine("TEST:TX PUT 2");
        prRegion.put(custIdTwo, customerTwo);

        getGemfireCache().getLogger().fine("TEST:TX COMMIT");
        mgr.commit();
        return null;
      }
    });

  }

  @Test
  public void testRegionAndEntryVersionsPR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createPersistentPR(new VM[] {server1, server2});
    execute(server2, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        getGemfireCache().getLogger().fine("TEST:DISTTX=" + mgr.isDistributed());
        getGemfireCache().getLogger().fine("TEST:TX BEGIN");
        mgr.begin();

        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        getGemfireCache().getLogger().fine("TEST:TX PUT 1");
        prRegion.put(custIdOne, customerOne);

        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custIdOne);

        assertEquals(0L, br.getVersionVector().getCurrentVersion());
        getGemfireCache().getLogger().fine("TEST:TX COMMIT 1");
        mgr.commit();

        // Verify region version on the region
        assertEquals(1L, br.getVersionVector().getCurrentVersion());

        RegionEntry re = br.getRegionEntry(custIdOne);
        getGemfireCache().getLogger().fine("TEST:VERSION-STAMP:" + re.getVersionStamp());

        // Verify region version on the region entry
        assertEquals(1L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(1, re.getVersionStamp().getEntryVersion());

        mgr.begin();
        prRegion.put(custIdOne, new Customer("name1_1", "addr1"));

        getGemfireCache().getLogger().fine("TEST:TX COMMIT 2");

        assertEquals(1L, br.getVersionVector().getCurrentVersion());
        mgr.commit();

        // Verify region version on the region
        assertEquals(2L, br.getVersionVector().getCurrentVersion());

        re = br.getRegionEntry(custIdOne);
        getGemfireCache().getLogger().fine("TEST:VERSION-STAMP:" + re.getVersionStamp());

        // Verify region version on the region entry
        assertEquals(2L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(2, re.getVersionStamp().getEntryVersion());
        return null;
      }

    });

    execute(server1, new SerializableCallable() {

      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> prRegion = getCache().getRegion(PERSISTENT_CUSTOMER_PR);
        CustId custIdOne = new CustId(1);
        BucketRegion br = ((PartitionedRegion) prRegion).getBucketRegion(custIdOne);

        // Verify region version on the region
        assertEquals(2L, br.getVersionVector().getCurrentVersion());

        // Verify region version ont the region entry
        RegionEntry re = br.getRegionEntry(custIdOne);
        assertEquals(2L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(2, re.getVersionStamp().getEntryVersion());
        return null;
      }

    });

  }

  @Test
  public void testRegionAndEntryVersionsRR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createRR(new VM[] {server1, server2});
    execute(server2, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        getGemfireCache().getLogger().fine("TEST:DISTTX=" + mgr.isDistributed());
        getGemfireCache().getLogger().fine("TEST:TX BEGIN");
        mgr.begin();

        Region<CustId, Customer> region = getCache().getRegion(CUSTOMER_RR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        getGemfireCache().getLogger().fine("TEST:TX PUT 1");
        region.put(custIdOne, customerOne);

        LocalRegion lr = (LocalRegion) region;

        assertEquals(0L, lr.getVersionVector().getCurrentVersion());
        getGemfireCache().getLogger().fine("TEST:TX COMMIT 1");
        mgr.commit();

        // Verify region version on the region
        assertEquals(1L, lr.getVersionVector().getCurrentVersion());

        RegionEntry re = lr.getRegionEntry(custIdOne);
        getGemfireCache().getLogger().fine("TEST:VERSION-STAMP:" + re.getVersionStamp());

        // Verify region version on the region entry
        assertEquals(1L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(1, re.getVersionStamp().getEntryVersion());

        mgr.begin();
        region.put(custIdOne, new Customer("name1_1", "addr1"));

        getGemfireCache().getLogger().fine("TEST:TX COMMIT 2");

        assertEquals(1L, lr.getVersionVector().getCurrentVersion());
        mgr.commit();

        // Verify region version on the region
        assertEquals(2L, lr.getVersionVector().getCurrentVersion());

        re = lr.getRegionEntry(custIdOne);
        getGemfireCache().getLogger().fine("TEST:VERSION-STAMP:" + re.getVersionStamp());

        // Verify region version on the region entry
        assertEquals(2L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(2, re.getVersionStamp().getEntryVersion());
        return null;
      }

    });

    execute(server1, new SerializableCallable() {

      @Override
      public Object call() throws Exception {
        Region<CustId, Customer> region = getCache().getRegion(CUSTOMER_RR);
        CustId custIdOne = new CustId(1);

        LocalRegion lr = (LocalRegion) region;

        // Verify region version on the region
        assertEquals(2L, lr.getVersionVector().getCurrentVersion());

        // Verify region version ont the region entry
        RegionEntry re = lr.getRegionEntry(custIdOne);
        assertEquals(2L, re.getVersionStamp().getRegionVersion());

        // Verify entry version
        assertEquals(2, re.getVersionStamp().getEntryVersion());
        return null;
      }

    });

  }


  @Test
  public void testTxWorksWithNewNodeJoining() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);

    createPR(new VM[] {server1, server2});

    class Ops extends SerializableCallable {
      private boolean flag = false;

      public void setWaitFlag(boolean value) {
        flag = value;
      }

      @Override
      public Object call() throws Exception {

        CacheTransactionManager mgr = getGemfireCache().getTxManager();
        mgr.setDistributed(true);
        mgr.begin();
        Region<CustId, Customer> prRegion = getCache().getRegion(CUSTOMER_PR);

        CustId custIdOne = new CustId(1);
        Customer customerOne = new Customer("name1", "addr1");
        prRegion.put(custIdOne, customerOne);


        // Install the hook at this point
        TestObserver o = TestObserver.getInstance();
        o.setFlag(true);

        while (o.getFlag()) {
          Thread.currentThread().sleep(1000);
        }

        mgr.commit();

        // Verify
        assertEquals(1, prRegion.size());
        assertTrue(prRegion.containsKey(custIdOne));

        return null;
      }
    }

    server1.invokeAsync(() -> new Ops().call());

    // Now create cache on the third server and let it join the distributed system.
    createPR(new VM[] {server3});

    // Let the original thread move on by signalling the flag

    execute(server1, new SerializableCallable() {
      @Override
      public Object call() throws Exception {
        TestObserver o = TestObserver.getInstance();
        o.setFlag(false);
        return null;
      }
    });
  }

  public static class TestObserver {
    private static final TestObserver singleInstance = new TestObserver();
    private static boolean flag = false;

    public static TestObserver getInstance() {
      return singleInstance;
    }

    public static void setFlag(boolean val) {
      flag = val;
    }

    public boolean getFlag() {
      return flag;
    }
  }


  private class TxOps_Conflicts extends SerializableCallable {

    final String regionName;

    public TxOps_Conflicts(String regionName) {
      this.regionName = regionName;
    }

    @Override
    public Object call() throws Exception {
      CacheTransactionManager mgr = getGemfireCache().getTxManager();
      mgr.setDistributed(true);
      mgr.begin();

      // Perform a put
      Region<CustId, Customer> custRegion = getCache().getRegion(this.regionName);

      CustId custIdOne = new CustId(1);
      Customer customerOne = new Customer("name1", "addr1");
      CustId custIdTwo = new CustId(2);
      Customer customerTwo = new Customer("name2", "addr2");
      CustId custIdThree = new CustId(3);
      Customer customerThree = new Customer("name3", "addr3");
      custRegion.put(custIdOne, customerOne);
      custRegion.put(custIdTwo, customerTwo);
      custRegion.put(custIdThree, customerThree);

      // spawn a new thread modify and custIdOne in another tx
      // so that outer thread fails
      class TxThread extends Thread {
        @Override
        public void run() {
          CacheTransactionManager mgr = getGemfireCache().getTxManager();
          mgr.setDistributed(true);
          mgr.begin();
          CustId custIdOne = new CustId(1);
          Customer customerOne = new Customer("name1", "addr11");
          Region<CustId, Customer> custRegion = getCache().getRegion(regionName);
          custRegion.put(custIdOne, customerOne);
          mgr.commit();
        }
      }

      TxThread txThread = new TxThread();
      txThread.start();
      txThread.join(); // let the tx commit

      try {
        mgr.commit();
        fail("this test should have failed with CommitConflictException");
        // [DISTTX] TODO after conflict detection either
        // CommitIncompleteException or CommitConflictException is thrown.
        // Should it always be CommitConflictException?
      } catch (CommitIncompleteException cie) {
      } catch (CommitConflictException ce) {
      }

      // verify data
      assertEquals(new Customer("name1", "addr11"), custRegion.get(custIdOne));
      assertEquals(null, custRegion.get(custIdTwo));
      assertEquals(null, custRegion.get(custIdThree));

      // clearing the region
      custRegion.remove(custIdOne);
      return null;
    }
  }


  /*
   * Start two concurrent transactions that put same entries. Make sure that conflict is detected at
   * the commit time.
   */
  @Test
  public void testCommitConflicts_PR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    VM accessor = host.getVM(3);

    createPRwithRedundanyCopies(new VM[] {server1, server2, server3}, 1);
    createPRonAccessor(accessor, 1);

    server1.invoke(new TxOps_Conflicts(CUSTOMER_PR));

    // test thru accessor as well
    accessor.invoke(new TxOps_Conflicts(CUSTOMER_PR));
  }

  /*
   * Start two concurrent transactions that put same entries. Make sure that conflict is detected at
   * the commit time.
   */
  @Test
  public void testCommitConflicts_RR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);
    VM server3 = host.getVM(2);
    VM accessor = host.getVM(3);

    createRR(new VM[] {server1, server2, server3});
    createRRonAccessor(accessor);

    server1.invoke(new TxOps_Conflicts(CUSTOMER_RR));

    // test thru accessor as well
    accessor.invoke(new TxOps_Conflicts(CUSTOMER_RR));
  }


  class TxConflictRunnable implements Runnable {
    final String regionName;

    public TxConflictRunnable(String regionName) {
      this.regionName = regionName;
    }

    @Override
    public void run() {
      // spawn a new thread modify and custIdOne in another tx
      // so that outer thread fails
      class TxThread extends Thread {
        public boolean gotConflict = false;
        public boolean gotOtherException = false;
        public Exception ex = new Exception();

        @Override
        public void run() {
          LogWriterUtils.getLogWriter()
              .info("Inside TxConflictRunnable.TxThread after aquiring locks");
          CacheTransactionManager mgr = getGemfireCache().getTxManager();
          mgr.setDistributed(true);
          mgr.begin();
          CustId custIdOne = new CustId(1);
          Customer customerOne = new Customer("name1", "addr11");
          Region<CustId, Customer> custRegion = getCache().getRegion(regionName);
          custRegion.put(custIdOne, customerOne);
          try {
            mgr.commit();
          } catch (CommitConflictException ce) {
            gotConflict = true;
            LogWriterUtils.getLogWriter().info("Received exception ", ce);
          } catch (Exception e) {
            gotOtherException = true;
            LogWriterUtils.getLogWriter().info("Received exception ", e);
            ex.initCause(e);
          }
        }
      }

      TxThread txThread = new TxThread();
      txThread.start();
      try {
        txThread.join(); // let the tx commit
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }

      assertTrue("This test should fail with CommitConflictException", txThread.gotConflict);
      if (txThread.gotOtherException) {
        Assert.fail("Received unexpected exception ", txThread.ex);
      }
    }
  }

  private class TxOps_conflicts_after_locks_acquired extends SerializableCallable {

    final String regionName;

    public TxOps_conflicts_after_locks_acquired(String regionName) {
      this.regionName = regionName;
    }

    @Override
    public Object call() throws Exception {
      CacheTransactionManager mgr = getGemfireCache().getTxManager();
      mgr.setDistributed(true);
      mgr.begin();

      // set up a callback to be invoked after locks are acquired at commit time
      ((TXStateProxyImpl) ((TXManagerImpl) mgr).getTXState()).forceLocalBootstrap();
      TXStateInterface txp = ((TXManagerImpl) mgr).getTXState();
      DistTXState tx = (DistTXState) ((TXStateProxyImpl) txp).getRealDeal(null, null);
      tx.setAfterReservation(new TxConflictRunnable(this.regionName)); // callback

      // Perform a put
      Region<CustId, Customer> custRegion = getCache().getRegion(this.regionName);

      CustId custIdOne = new CustId(1);
      Customer customerOne = new Customer("name1", "addr1");
      CustId custIdTwo = new CustId(2);
      Customer customerTwo = new Customer("name2", "addr2");
      CustId custIdThree = new CustId(3);
      Customer customerThree = new Customer("name3", "addr3");
      CustId custIdFour = new CustId(4);
      Customer customerFour = new Customer("name4", "addr4");
      custRegion.put(custIdOne, customerOne);
      custRegion.put(custIdTwo, customerTwo);
      custRegion.put(custIdThree, customerThree);
      custRegion.put(custIdFour, customerFour);

      // will invoke the callback that spawns a new thread and another
      // transaction
      mgr.commit();

      // verify data
      assertEquals(new Customer("name1", "addr1"), custRegion.get(custIdOne));
      assertEquals(new Customer("name2", "addr2"), custRegion.get(custIdTwo));
      assertEquals(new Customer("name3", "addr3"), custRegion.get(custIdThree));
      assertEquals(new Customer("name4", "addr4"), custRegion.get(custIdFour));

      return null;
    }
  }

  /*
   * Start a transaction, at commit time after acquiring locks, start another transaction in a new
   * thread that modifies same entries as in the earlier transaction. Make sure that conflict is
   * detected
   */
  @Test
  public void testCommitConflicts_PR_after_locks_acquired() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    // createPRwithRedundanyCopies(new VM[] { server1, server2 }, 1);

    createPRwithRedundanyCopies(new VM[] {server1}, 0);

    server1.invoke(new TxOps_conflicts_after_locks_acquired(CUSTOMER_PR));
  }

  /*
   * Start a transaction, at commit time after acquiring locks, start another transaction in a new
   * thread that modifies same entries as in the earlier transaction. Make sure that conflict is
   * detected
   */
  @Test
  public void testCommitConflicts_RR_after_locks_acquired() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createRR(new VM[] {server1, server2});

    server1.invoke(new TxOps_conflicts_after_locks_acquired(CUSTOMER_RR));
  }


  class TxRunnable implements Runnable {
    final String regionName;

    public TxRunnable(String regionName) {
      this.regionName = regionName;
    }

    @Override
    public void run() {
      class TxThread extends Thread {
        public boolean gotException = false;
        public Exception ex = new Exception();

        @Override
        public void run() {
          LogWriterUtils.getLogWriter().info("Inside TxRunnable.TxThread after aquiring locks");
          CacheTransactionManager mgr = getGemfireCache().getTxManager();
          mgr.setDistributed(true);
          mgr.begin();
          Region<CustId, Customer> custRegion = getCache().getRegion(regionName);
          for (int i = 11; i <= 20; i++) {
            custRegion.put(new CustId(i), new Customer("name" + i, "addr" + i));
          }
          try {
            mgr.commit();
          } catch (Exception e) {
            gotException = true;
            LogWriterUtils.getLogWriter().info("Received exception ", e);
            ex.initCause(e);
          }
        }
      }

      TxThread txThread = new TxThread();
      txThread.start();
      try {
        txThread.join(); // let the tx commit
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      if (txThread.gotException) {
        Assert.fail("Received exception ", txThread.ex);
      }
    }
  }

  private class TxOps_no_conflicts extends SerializableCallable {

    final String regionName;

    public TxOps_no_conflicts(String regionName) {
      this.regionName = regionName;
    }

    @Override
    public Object call() throws Exception {
      CacheTransactionManager mgr = getGemfireCache().getTxManager();
      mgr.setDistributed(true);
      mgr.begin();

      // set up a callback to be invoked after locks are acquired at commit time
      ((TXStateProxyImpl) ((TXManagerImpl) mgr).getTXState()).forceLocalBootstrap();
      TXStateInterface txp = ((TXManagerImpl) mgr).getTXState();
      DistTXState tx = (DistTXState) ((TXStateProxyImpl) txp).getRealDeal(null, null);
      tx.setAfterReservation(new TxRunnable(this.regionName)); // callback

      // Perform a put
      Region<CustId, Customer> custRegion = getCache().getRegion(this.regionName);

      CustId custIdOne = new CustId(1);
      Customer customerOne = new Customer("name1", "addr1");
      CustId custIdTwo = new CustId(2);
      Customer customerTwo = new Customer("name2", "addr2");
      CustId custIdThree = new CustId(3);
      Customer customerThree = new Customer("name3", "addr3");
      CustId custIdFour = new CustId(4);
      Customer customerFour = new Customer("name4", "addr4");
      custRegion.put(custIdOne, customerOne);
      custRegion.put(custIdTwo, customerTwo);
      custRegion.put(custIdThree, customerThree);
      custRegion.put(custIdFour, customerFour);

      // will invoke the callback that spawns a new thread and another
      // transaction that does puts of 10 entries
      mgr.commit();

      // verify data
      assertEquals(14, custRegion.size());

      return null;
    }
  }

  /*
   * Start a transaction, at commit time after acquiring locks, start another transaction in a new
   * thread that modifies different entries Make sure that there is no conflict or exception.
   */
  @Test
  public void testCommitNoConflicts_PR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createPRwithRedundanyCopies(new VM[] {server1, server2}, 1);

    server1.invoke(new TxOps_no_conflicts(CUSTOMER_PR));
  }

  /*
   * Start a transaction, at commit time after acquiring locks, start another transaction in a new
   * thread that modifies different entries Make sure that there is no conflict or exception.
   */
  @Test
  public void testCommitNoConflicts_RR() throws Exception {
    Host host = Host.getHost(0);
    VM server1 = host.getVM(0);
    VM server2 = host.getVM(1);

    createRR(new VM[] {server1, server2});

    server1.invoke(new TxOps_no_conflicts(CUSTOMER_RR));
  }



}
