/*
 * Copyright © 2014 Cask Data, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package co.cask.tephra.hbase98;

import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionConflictException;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.TxConstants;
import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
import co.cask.tephra.inmemory.InMemoryTxSystemClient;
import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.InMemoryTransactionStateStorage;
import co.cask.tephra.persist.TransactionStateStorage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Tests for TransactionAwareHTables.
 */
public class TransactionAwareHTableTest {
  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);

  @ClassRule
  public static TemporaryFolder tmpFolder = new TemporaryFolder();

  private static HBaseTestingUtility testUtil;
  private static HBaseAdmin hBaseAdmin;
  private static TransactionStateStorage txStateStorage;
  private static TransactionManager txManager;
  private static Configuration conf;
  private TransactionContext transactionContext;
  private TransactionAwareHTable transactionAwareHTable;

  private static final class TestBytes {
    private static final byte[] table = Bytes.toBytes("testtable");
    private static final byte[] family = Bytes.toBytes("f1");
    private static final byte[] family2 = Bytes.toBytes("f2");
    private static final byte[] qualifier = Bytes.toBytes("col1");
    private static final byte[] qualifier2 = Bytes.toBytes("col2");
    private static final byte[] row = Bytes.toBytes("row");
    private static final byte[] row2 = Bytes.toBytes("row2");
    private static final byte[] row3 = Bytes.toBytes("row3");
    private static final byte[] row4 = Bytes.toBytes("row4");
    private static final byte[] value = Bytes.toBytes("value");
    private static final byte[] value2 = Bytes.toBytes("value2");
  }

  @BeforeClass
  public static void setupBeforeClass() throws Exception {
    conf = HBaseConfiguration.create();
    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
    testUtil = new HBaseTestingUtility(conf);
    testUtil.startMiniCluster();
    conf = testUtil.getConfiguration();
    hBaseAdmin = testUtil.getHBaseAdmin();
    txStateStorage = new InMemoryTransactionStateStorage();
    txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
    txManager.startAndWait();
  }

  @AfterClass
  public static void shutdownAfterClass() throws Exception {
    testUtil.shutdownMiniCluster();
    hBaseAdmin.close();
  }

  @Before
  public void setupBeforeTest() throws Exception {
    HTable hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
    transactionAwareHTable = new TransactionAwareHTable(hTable);
    transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
  }

  @After
  public void shutdownAfterTest() throws IOException {
    hBaseAdmin.disableTable(TestBytes.table);
    hBaseAdmin.deleteTable(TestBytes.table);
  }

  private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
    HTableDescriptor desc = getTableDescriptor(tableName, columnFamilies);
    desc.addCoprocessor(TransactionProcessor.class.getName());
    return doCreateTable(tableName, desc);
  }

  private HTable createNonTxTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
    HTableDescriptor desc = getTableDescriptor(tableName, columnFamilies);
    return doCreateTable(tableName, desc);
  }

  private HTable doCreateTable(byte[] tableName, HTableDescriptor desc) throws IOException, InterruptedException {
    hBaseAdmin.createTable(desc);
    testUtil.waitTableAvailable(tableName, 5000);
    return new HTable(testUtil.getConfiguration(), tableName);
  }

  private HTableDescriptor getTableDescriptor(byte[] tableName, byte[][] columnFamilies) {
    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
    for (byte[] family : columnFamilies) {
      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
      columnDesc.setMaxVersions(Integer.MAX_VALUE);
      desc.addFamily(columnDesc);
    }
    return desc;
  }

  /**
   * Test transactional put and get requests.
   *
   * @throws Exception
   */
  @Test
  public void testValidTransactionalPutAndGet() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();

    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);
  }

  /**
   * Test aborted put requests, that must be rolled back.
   *
   * @throws Exception
   */
  @Test
  public void testAbortedTransactionPutAndGet() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);

    transactionContext.abort();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(value, null);
  }

  /**
   * Test transactional delete operations.
   *
   * @throws Exception
   */
  @Test
  public void testValidTransactionalDelete() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
        new byte[][]{TestBytes.family, TestBytes.family2});
    try {
      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

      txContext.start();
      Put put = new Put(TestBytes.row);
      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
      put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
      txTable.put(put);
      txContext.finish();

      txContext.start();
      Result result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value, value);
      value = result.getValue(TestBytes.family2, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value2, value);

      // test full row delete
      txContext.start();
      Delete delete = new Delete(TestBytes.row);
      txTable.delete(delete);
      txContext.finish();

      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      assertTrue(result.isEmpty());

      // test column delete
      // load 10 rows
      txContext.start();
      int rowCount = 10;
      for (int i = 0; i < rowCount; i++) {
        Put p = new Put(Bytes.toBytes("row" + i));
        for (int j = 0; j < 10; j++) {
          p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
        }
        txTable.put(p);
      }
      txContext.finish();

      // verify loaded rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("row" + i));
        Result r = txTable.get(g);
        assertFalse(r.isEmpty());
        for (int j = 0; j < 10; j++) {
          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
        }
      }
      txContext.finish();

      // delete odds columns from odd rows and even columns from even rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Delete d = new Delete(Bytes.toBytes("row" + i));
        for (int j = 0; j < 10; j++) {
          if (i % 2 == j % 2) {
            LOG.info("Deleting row={}, column={}", i, j);
            d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
          }
        }
        txTable.delete(d);
      }
      txContext.finish();

      // verify deleted columns
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("row" + i));
        Result r = txTable.get(g);
        assertEquals(5, r.size());
        for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
          int col = Bytes.toInt(entry.getKey());
          LOG.info("Got row={}, col={}", i, col);
          // each row should only have the opposite mod (odd=even, even=odd)
          assertNotEquals(i % 2, col % 2);
          assertArrayEquals(TestBytes.value, entry.getValue());
        }
      }
      txContext.finish();

      // test family delete
      // load 10 rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Put p = new Put(Bytes.toBytes("famrow" + i));
        p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
        p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
        txTable.put(p);
      }
      txContext.finish();

      // verify all loaded rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("famrow" + i));
        Result r = txTable.get(g);
        assertEquals(2, r.size());
        assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
        assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
      }
      txContext.finish();

      // delete family1 for even rows, family2 for odd rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Delete d = new Delete(Bytes.toBytes("famrow" + i));
        d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
        txTable.delete(d);
      }
      txContext.finish();

      // verify deleted families
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("famrow" + i));
        Result r = txTable.get(g);
        assertEquals(1, r.size());
        if (i % 2 == 0) {
          assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
          assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
        } else {
          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
          assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
        }
      }
      txContext.finish();
    } finally {
      hTable.close();
    }
  }

  /**
   * Test aborted transactional delete requests, that must be rolled back.
   *
   * @throws Exception
   */
  @Test
  public void testAbortedTransactionalDelete() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);

    transactionContext.start();
    Delete delete = new Delete(TestBytes.row);
    transactionAwareHTable.delete(delete);
    transactionContext.abort();

    transactionContext.start();
    result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);
  }

  @Test
  public void testRowDelete() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
    TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW);
    try {
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

      // Test 1: full row delete
      txContext.start();
      txTable.put(new Put(TestBytes.row)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      Get get = new Get(TestBytes.row);
      Result result = txTable.get(get);
      assertFalse(result.isEmpty());
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
      txContext.finish();

      // delete entire row
      txContext.start();
      txTable.delete(new Delete(TestBytes.row));
      txContext.finish();

      // verify row is now empty
      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      assertTrue(result.isEmpty());

      // verify row is empty for explicit column retrieval
      result = txTable.get(new Get(TestBytes.row)
          .addColumn(TestBytes.family, TestBytes.qualifier)
          .addFamily(TestBytes.family2));
      assertTrue(result.isEmpty());

      // verify row is empty for scan
      ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
      assertNull(scanner.next());
      scanner.close();

      // verify row is empty for scan with explicit column
      scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
      assertNull(scanner.next());
      scanner.close();
      txContext.finish();

      // write swapped values to one column per family
      txContext.start();
      txTable.put(new Put(TestBytes.row)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
      txContext.finish();

      // verify new values appear
      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      assertFalse(result.isEmpty());
      assertEquals(2, result.size());
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));

      scanner = txTable.getScanner(new Scan(TestBytes.row));
      Result result1 = scanner.next();
      assertNotNull(result1);
      assertFalse(result1.isEmpty());
      assertEquals(2, result1.size());
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
      scanner.close();
      txContext.finish();

      // Test 2: delete of first column family
      txContext.start();
      txTable.put(new Put(TestBytes.row2)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
      txContext.finish();

      txContext.start();
      Result fam1Result = txTable.get(new Get(TestBytes.row2));
      assertFalse(fam1Result.isEmpty());
      assertEquals(2, fam1Result.size());
      assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
      txContext.finish();

      // Test 3: delete of second column family
      txContext.start();
      txTable.put(new Put(TestBytes.row3)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
      txContext.finish();

      txContext.start();
      Result fam2Result = txTable.get(new Get(TestBytes.row3));
      assertFalse(fam2Result.isEmpty());
      assertEquals(2, fam2Result.size());
      assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
      txContext.finish();

      // Test 4: delete specific rows in a range
      txContext.start();
      for (int i = 0; i < 10; i++) {
        txTable.put(new Put(Bytes.toBytes("z" + i))
            .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
            .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
      }
      txContext.finish();

      txContext.start();
      // delete odd rows
      for (int i = 1; i < 10; i += 2) {
        txTable.delete(new Delete(Bytes.toBytes("z" + i)));
      }
      txContext.finish();

      txContext.start();
      int cnt = 0;
      ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
      Result res;
      while ((res = zScanner.next()) != null) {
        assertFalse(res.isEmpty());
        assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
        cnt += 2;
      }

      // Test 5: delete prior writes in the same transaction
      txContext.start();
      txTable.put(new Put(TestBytes.row4)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txTable.delete(new Delete(TestBytes.row4));
      txContext.finish();

      txContext.start();
      Result row4Result = txTable.get(new Get(TestBytes.row4));
      assertTrue(row4Result.isEmpty());
      txContext.finish();
    } finally {
      txTable.close();
    }
  }

  /**
   * Expect an exception since a transaction hasn't been started.
   *
   * @throws Exception
   */
  @Test(expected = IOException.class)
  public void testTransactionlessFailure() throws Exception {
    transactionAwareHTable.get(new Get(TestBytes.row));
  }

  /**
   * Tests that each transaction can see its own persisted writes, while not seeing writes from other
   * in-progress transactions.
   */
  @Test
  public void testReadYourWrites() throws Exception {
    // In-progress tx1: started before our main transaction
    HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
    TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
    TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);

    // In-progress tx2: started while our main transaction is running
    HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
    TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
    TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);

    // create an in-progress write that should be ignored
    byte[] col2 = Bytes.toBytes("col2");
    inprogressTxContext1.start();
    Put putCol2 = new Put(TestBytes.row);
    byte[] valueCol2 = Bytes.toBytes("writing in progress");
    putCol2.add(TestBytes.family, col2, valueCol2);
    txHTable1.put(putCol2);

    // start a tx and write a value to test reading in same tx
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    byte[] value = Bytes.toBytes("writing");
    put.add(TestBytes.family, TestBytes.qualifier, value);
    transactionAwareHTable.put(put);

    // test that a write from a tx started after the first is not visible
    inprogressTxContext2.start();
    Put put2 = new Put(TestBytes.row);
    byte[] value2 = Bytes.toBytes("writing2");
    put2.add(TestBytes.family, TestBytes.qualifier, value2);
    txHTable2.put(put2);

    Get get = new Get(TestBytes.row);
    Result row = transactionAwareHTable.get(get);
    assertFalse(row.isEmpty());
    byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
    Assert.assertNotNull(col1Value);
    Assert.assertArrayEquals(value, col1Value);
    // write from in-progress transaction should not be visible
    byte[] col2Value = row.getValue(TestBytes.family, col2);
    assertNull(col2Value);

    // commit in-progress transaction, should still not be visible
    inprogressTxContext1.finish();

    get = new Get(TestBytes.row);
    row = transactionAwareHTable.get(get);
    assertFalse(row.isEmpty());
    col2Value = row.getValue(TestBytes.family, col2);
    assertNull(col2Value);

    transactionContext.finish();

    inprogressTxContext2.abort();
  }

  @Test
  public void testRowLevelConflictDetection() throws Exception {
    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.ROW);
    TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);

    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.ROW);
    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);

    byte[] row1 = Bytes.toBytes("row1");
    byte[] row2 = Bytes.toBytes("row2");
    byte[] col1 = Bytes.toBytes("c1");
    byte[] col2 = Bytes.toBytes("c2");
    byte[] val1 = Bytes.toBytes("val1");
    byte[] val2 = Bytes.toBytes("val2");

    // test that concurrent writing to different rows succeeds
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));

    txContext2.start();
    txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));

    // should be no conflicts
    txContext1.finish();
    txContext2.finish();

    transactionContext.start();
    Result res = transactionAwareHTable.get(new Get(row1));
    assertFalse(res.isEmpty());
    Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    assertArrayEquals(val1, CellUtil.cloneValue(cell));

    res = transactionAwareHTable.get(new Get(row2));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    assertArrayEquals(val2, CellUtil.cloneValue(cell));
    transactionContext.finish();

    // test that writing to different columns in the same row fails
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));

    txContext2.start();
    txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));

    txContext1.finish();
    try {
      txContext2.finish();
      fail("txContext2 should have encountered a row-level conflict during commit");
    } catch (TransactionConflictException tce) {
      txContext2.abort();
    }

    transactionContext.start();
    res = transactionAwareHTable.get(new Get(row1));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    // should now be val2
    assertArrayEquals(val2, CellUtil.cloneValue(cell));

    cell = res.getColumnLatestCell(TestBytes.family, col2);
    // col2 should not be visible due to conflict
    assertNull(cell);
    transactionContext.finish();

    // test that writing to the same column in the same row fails
    txContext1.start();
    txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));

    txContext2.start();
    txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));

    txContext1.finish();
    try {
      txContext2.finish();
      fail("txContext2 should have encountered a row and column level conflict during commit");
    } catch (TransactionConflictException tce) {
      txContext2.abort();
    }

    transactionContext.start();
    res = transactionAwareHTable.get(new Get(row2));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col2);
    assertNotNull(cell);
    // should now be val1
    assertArrayEquals(val1, CellUtil.cloneValue(cell));
    transactionContext.finish();

    // verify change set that is being reported only on rows
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
    txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));

    Collection<byte[]> changeSet = txTable1.getTxChanges();
    assertNotNull(changeSet);
    assertEquals(2, changeSet.size());
    assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
    assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
    txContext1.finish();
  }

  @Test
  public void testNoneLevelConflictDetection() throws Exception {
    InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.NONE);
    TransactionContext txContext1 = new TransactionContext(txClient, txTable1);

    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.NONE);
    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);

    // overlapping writes to the same row + column should not conflict

    txContext1.start();
    txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    // changes should not be visible yet
    txContext2.start();
    Result row = txTable2.get(new Get(TestBytes.row));
    assertTrue(row.isEmpty());

    txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));

    // both commits should succeed
    txContext1.finish();
    txContext2.finish();

    txContext1.start();
    row = txTable1.get(new Get(TestBytes.row));
    assertFalse(row.isEmpty());
    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
    txContext1.finish();

    // transaction abort should still rollback changes

    txContext1.start();
    txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    txContext1.abort();

    // changes to row2 should be rolled back
    txContext2.start();
    Result row2 = txTable2.get(new Get(TestBytes.row2));
    assertTrue(row2.isEmpty());
    txContext2.finish();

    // transaction invalidate should still make changes invisible

    txContext1.start();
    Transaction tx1 = txContext1.getCurrentTransaction();
    txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    assertNotNull(tx1);
    txClient.invalidate(tx1.getWritePointer());

    // changes to row2 should be rolled back
    txContext2.start();
    Result row3 = txTable2.get(new Get(TestBytes.row3));
    assertTrue(row3.isEmpty());
    txContext2.finish();
  }

  @Test
  public void testCheckpoint() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    Transaction origTx = transactionContext.getCurrentTransaction();
    transactionContext.checkpoint();
    Transaction postCheckpointTx = transactionContext.getCurrentTransaction();

    assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
    assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
    long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
    assertEquals(1, checkpointPtrs.length);
    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);

    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();

    assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
    assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
    long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
    assertEquals(2, checkpointPtrs2.length);
    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
    assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);

    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    // by default, all rows should be visible with Read-Your-Writes
    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
    verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);

    // when disabling current write pointer, only the previous checkpoints should be visible
    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    Get get = new Get(TestBytes.row);
    verifyRow(transactionAwareHTable, get, TestBytes.value);
    get = new Get(TestBytes.row2);
    verifyRow(transactionAwareHTable, get, TestBytes.value2);
    get = new Get(TestBytes.row3);
    verifyRow(transactionAwareHTable, get, null);

    // test scan results excluding current write pointer
    Scan scan = new Scan();
    ResultScanner scanner = transactionAwareHTable.getScanner(scan);

    Result row = scanner.next();
    assertNotNull(row);
    assertArrayEquals(TestBytes.row, row.getRow());
    assertEquals(1, row.size());
    assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));

    row = scanner.next();
    assertNotNull(row);
    assertArrayEquals(TestBytes.row2, row.getRow());
    assertEquals(1, row.size());
    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));

    row = scanner.next();
    assertNull(row);
    scanner.close();
    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);

    // check that writes are still not visible to other clients
    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);

    txContext2.start();
    verifyRow(txTable2, TestBytes.row, null);
    verifyRow(txTable2, TestBytes.row2, null);
    verifyRow(txTable2, TestBytes.row3, null);
    txContext2.finish();

    // commit transaction, verify writes are visible
    transactionContext.finish();

    txContext2.start();
    verifyRow(txTable2, TestBytes.row, TestBytes.value);
    verifyRow(txTable2, TestBytes.row2, TestBytes.value2);
    verifyRow(txTable2, TestBytes.row3, TestBytes.value);
    txContext2.finish();
    txTable2.close();
  }

  @Test
  public void testCheckpointRollback() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    transactionContext.checkpoint();
    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    transactionContext.abort();

    transactionContext.start();
    verifyRow(transactionAwareHTable, TestBytes.row, null);
    verifyRow(transactionAwareHTable, TestBytes.row2, null);
    verifyRow(transactionAwareHTable, TestBytes.row3, null);

    Scan scan = new Scan();
    ResultScanner scanner = transactionAwareHTable.getScanner(scan);
    assertNull(scanner.next());
    scanner.close();
    transactionContext.finish();
  }

  @Test
  public void testCheckpointInvalidate() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    Transaction origTx = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    transactionContext.checkpoint();
    Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
    txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());

    // check that writes are not visible
    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
    txContext2.start();
    Transaction newTx = txContext2.getCurrentTransaction();

    // all 3 writes pointers from the previous transaction should now be excluded
    assertTrue(newTx.isExcluded(origTx.getWritePointer()));
    assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
    assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));

    verifyRow(txTable2, TestBytes.row, null);
    verifyRow(txTable2, TestBytes.row2, null);
    verifyRow(txTable2, TestBytes.row3, null);

    Scan scan = new Scan();
    ResultScanner scanner = txTable2.getScanner(scan);
    assertNull(scanner.next());
    scanner.close();
    txContext2.finish();
  }

  private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
    verifyRow(table, new Get(rowkey), expectedValue);
  }

  private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
    Result result = table.get(get);
    if (expectedValue == null) {
      assertTrue(result.isEmpty());
    } else {
      assertFalse(result.isEmpty());
      if (get.hasFamilies()) {
        byte[] family = get.getFamilyMap().keySet().iterator().next();
        byte[] col = get.getFamilyMap().get(family).first();
        assertArrayEquals(expectedValue, result.getValue(family, col));
      } else {
        assertArrayEquals(expectedValue, result.getValue(TestBytes.family, TestBytes.qualifier));
      }
    }
  }

  @Test
  public void testExistingData() throws Exception {
    byte[] row1 = Bytes.toBytes("row1");
    byte[] row2 = Bytes.toBytes("row2");
    byte[] row3 = Bytes.toBytes("row3");
    byte[] col1 = Bytes.toBytes("c1");
    byte[] col2 = Bytes.toBytes("c2");
    byte[] val11 = Bytes.toBytes("val11");
    byte[] val12 = Bytes.toBytes("val12");
    byte[] val21 = Bytes.toBytes("val21");
    byte[] val22 = Bytes.toBytes("val22");
    byte[] val31 = Bytes.toBytes("val31");

    long ttl = TimeUnit.DAYS.toMillis(6);

    // Write some data to a non-transactional table
    byte[] tableName = Bytes.toBytes("testExistingData");
    try {
      try (HTable noTxTable = createNonTxTable(tableName, new byte[][]{TestBytes.family})) {
        long existingDataTimestamp1 = System.currentTimeMillis() - ttl / 2;
        long existingDataTimestamp2 = System.currentTimeMillis() - ttl / 3;
        noTxTable.put(new Put(row1).add(TestBytes.family, col1, existingDataTimestamp1, val11));
        noTxTable.put(new Put(row1).add(TestBytes.family, col2, existingDataTimestamp2, val12));
        noTxTable.put(new Put(row2).add(TestBytes.family, col1, existingDataTimestamp1, val21));
        noTxTable.put(new Put(row2).add(TestBytes.family, col2, existingDataTimestamp2, val22));
        noTxTable.flushCommits();

        verifyRow(noTxTable, new Get(row1).addColumn(TestBytes.family, col1), val11);
        verifyRow(noTxTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
        verifyRow(noTxTable, new Get(row2).addColumn(TestBytes.family, col1), val21);
        verifyRow(noTxTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
      }

      // Now try reading the data as a transactional table
      // Attach co-processor to the table
      HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName);
      tableDescriptor.addCoprocessor(TransactionProcessor.class.getName());
      hBaseAdmin.modifyTable(tableName, tableDescriptor);
      setTtl(tableDescriptor, TestBytes.family, ttl);

      InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
      try (TransactionAwareHTable txTable = new TransactionAwareHTable(new HTable(conf, tableName))) {
        TransactionContext txContext = new TransactionContext(txClient, txTable);
        txContext.start();
        txTable.put(new Put(row3).add(TestBytes.family, col1, val31));
        txContext.finish();
        txTable.flushCommits();

        // Should be able to read existing data after major compaction
        LOG.error("111111111 running major compaction");
        majorCompact(tableDescriptor, TestBytes.family);
        LOG.error("111111111 done running major compaction");

        txContext = new TransactionContext(txClient, txTable);
        txContext.start();
        verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), val31);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), val11);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), val21);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
        txContext.finish();

        setTtl(tableDescriptor, TestBytes.family, ttl / 2);

        LOG.error("111111111 running major compaction");
        majorCompact(tableDescriptor, TestBytes.family);
        LOG.error("111111111 done running major compaction");

        txContext = new TransactionContext(txClient, txTable);
        txContext.start();
        verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), val31);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), null);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), val12);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), null);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), val22);
        txContext.finish();

        setTtl(tableDescriptor, TestBytes.family, 1);

        LOG.error("111111111 running major compaction");
        majorCompact(tableDescriptor, TestBytes.family);
        LOG.error("111111111 done running major compaction");

        txContext = new TransactionContext(txClient, txTable);
        txContext.start();
        verifyRow(txTable, new Get(row3).addColumn(TestBytes.family, col1), null);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col1), null);
        verifyRow(txTable, new Get(row1).addColumn(TestBytes.family, col2), null);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col1), null);
        verifyRow(txTable, new Get(row2).addColumn(TestBytes.family, col2), null);
        txContext.finish();
      }
    } finally {
      if (hBaseAdmin.tableExists(tableName)) {
        hBaseAdmin.disableTable(tableName);
        hBaseAdmin.deleteTable(tableName);
      }
    }
  }

  private void setTtl(HTableDescriptor tableDescriptor, byte[] family, long ttl) throws Exception {
    HColumnDescriptor familyDescriptor = new HColumnDescriptor(family);
    familyDescriptor.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
    tableDescriptor.addFamily(familyDescriptor);
    hBaseAdmin.modifyTable(tableDescriptor.getTableName(), tableDescriptor);
  }

  private void majorCompact(HTableDescriptor tableDescriptor, byte[] family) throws Exception {
    long before = System.currentTimeMillis();
    testUtil.compact(tableDescriptor.getTableName(), true);
    while (true) {
      for (StoreFileInfo storeFileInfo : getStoreFiles(tableDescriptor, family)) {
        LOG.error("Store file = {}, start time = {}, modification time = {}",
                  storeFileInfo.getPath().toUri(), before, storeFileInfo.getModificationTime());
        if (storeFileInfo.getModificationTime() > before) {
          return;
        }
      }
      TimeUnit.MILLISECONDS.sleep(10);
    }
  }

  private Set<StoreFileInfo> getStoreFiles(HTableDescriptor tableDescriptor, byte[] family) throws Exception {
    Set<StoreFileInfo> storeFiles = new HashSet<>();
    Path tableDir = FSUtils.getTableDir(testUtil.getDefaultRootDirPath(), tableDescriptor.getTableName());
    for (HRegionInfo regionInfo : hBaseAdmin.getTableRegions(tableDescriptor.getTableName())) {
      HRegionFileSystem hRegionFileSystem =
        HRegionFileSystem.openRegionFromFileSystem(conf, testUtil.getTestFileSystem(), tableDir, regionInfo, true);
      storeFiles.addAll(hRegionFileSystem.getStoreFiles(family));
    }
    return storeFiles;
  }
}
