/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tephra.hbase;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.LongComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.InMemoryTransactionStateStorage;
import org.apache.tephra.persist.TransactionStateStorage;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Tests for TransactionAwareHTables.
 */
public class TransactionAwareHTableTest {
  private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class);

  private static HBaseTestingUtility testUtil;
  private static HBaseAdmin hBaseAdmin;
  private static TransactionStateStorage txStateStorage;
  private static TransactionManager txManager;
  private static Configuration conf;
  private TransactionContext transactionContext;
  private TransactionAwareHTable transactionAwareHTable;
  private HTable hTable;

  private static final class TestBytes {
    private static final byte[] table = Bytes.toBytes("testtable");
    private static final byte[] family = Bytes.toBytes("f1");
    private static final byte[] family2 = Bytes.toBytes("f2");
    private static final byte[] qualifier = Bytes.toBytes("col1");
    private static final byte[] qualifier2 = Bytes.toBytes("col2");
    private static final byte[] row = Bytes.toBytes("row");
    private static final byte[] row2 = Bytes.toBytes("row2");
    private static final byte[] row3 = Bytes.toBytes("row3");
    private static final byte[] row4 = Bytes.toBytes("row4");
    private static final byte[] value = Bytes.toBytes("value");
    private static final byte[] value2 = Bytes.toBytes("value2");
    private static final byte[] value3 = Bytes.toBytes("value3");
  }

  private static final String TEST_ATTRIBUTE = "TEST_ATTRIBUTE";

  public static class TestRegionObserver extends BaseRegionObserver {
    @Override
    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
                       final Put put, final WALEdit edit,
                       final Durability durability) throws IOException {
      if (put.getAttribute(TEST_ATTRIBUTE) == null) {
        throw new DoNotRetryIOException("Put should preserve attributes");
      }
      if (put.getDurability() != Durability.USE_DEFAULT) {
        throw new DoNotRetryIOException("Durability is not propagated correctly");
      }
    }

    @Override
    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
                          final Delete delete, final WALEdit edit,
                          final Durability durability) throws IOException {
      if (delete.getAttribute(TEST_ATTRIBUTE) == null) {
        throw new DoNotRetryIOException("Delete should preserve attributes");
      }
      if (delete.getDurability() != Durability.USE_DEFAULT) {
        throw new DoNotRetryIOException("Durability is not propagated correctly");
      }
    }
  }
  
  @BeforeClass
  public static void setupBeforeClass() throws Exception {
    testUtil = new HBaseTestingUtility();
    conf = testUtil.getConfiguration();

    // Tune down the connection thread pool size
    conf.setInt("hbase.hconnection.threads.core", 5);
    conf.setInt("hbase.hconnection.threads.max", 10);
    // Tunn down handler threads in regionserver
    conf.setInt("hbase.regionserver.handler.count", 10);

    // Set to random port
    conf.setInt("hbase.master.port", 0);
    conf.setInt("hbase.master.info.port", 0);
    conf.setInt("hbase.regionserver.port", 0);
    conf.setInt("hbase.regionserver.info.port", 0);

    testUtil.startMiniCluster();
    hBaseAdmin = testUtil.getHBaseAdmin();
    txStateStorage = new InMemoryTransactionStateStorage();
    txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
    txManager.startAndWait();
  }

  @AfterClass
  public static void shutdownAfterClass() throws Exception {
    testUtil.shutdownMiniCluster();
    hBaseAdmin.close();
  }

  @Before
  public void setupBeforeTest() throws Exception {
    hTable = createTable(TestBytes.table, new byte[][]{TestBytes.family});
    transactionAwareHTable = new TransactionAwareHTable(hTable);
    transactionContext = new TransactionContext(new InMemoryTxSystemClient(txManager), transactionAwareHTable);
  }

  @After
  public void shutdownAfterTest() throws IOException {
    hBaseAdmin.disableTable(TestBytes.table);
    hBaseAdmin.deleteTable(TestBytes.table);
  }

  private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
    return createTable(tableName, columnFamilies, false, Collections.<String>emptyList());
  }

  private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, 
    List<String> coprocessors) throws Exception {
    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
    for (byte[] family : columnFamilies) {
      HColumnDescriptor columnDesc = new HColumnDescriptor(family);
      columnDesc.setMaxVersions(Integer.MAX_VALUE);
      columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis
      desc.addFamily(columnDesc);
    }
    if (existingData) {
      desc.setValue(TxConstants.READ_NON_TX_DATA, "true");
    }
    // Divide individually to prevent any overflow
    int priority  = Coprocessor.PRIORITY_USER; 
    desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null);
    // order in list is the same order that coprocessors will be invoked  
    for (String coprocessor : coprocessors) {
      desc.addCoprocessor(coprocessor, null, ++priority, null);
    }
    hBaseAdmin.createTable(desc);
    testUtil.waitTableAvailable(tableName, 5000);
    return new HTable(testUtil.getConfiguration(), tableName);
   }

  /**
   * Test transactional put and get requests.
   *
   * @throws Exception
   */
  @Test
  public void testValidTransactionalPutAndGet() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();

    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);
  }

  /**
   * Test aborted put requests, that must be rolled back.
   *
   * @throws Exception
   */
  @Test
  public void testAbortedTransactionPutAndGet() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);

    transactionContext.abort();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(value, null);
  }

  /**
   * Test transactional delete operations.
   *
   * @throws Exception
   */
  @Test
  public void testValidTransactionalDelete() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
        new byte[][]{TestBytes.family, TestBytes.family2});
    try {
      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

      txContext.start();
      Put put = new Put(TestBytes.row);
      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
      put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
      txTable.put(put);
      txContext.finish();

      txContext.start();
      Result result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value, value);
      value = result.getValue(TestBytes.family2, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value2, value);

      // test full row delete
      txContext.start();
      Delete delete = new Delete(TestBytes.row);
      txTable.delete(delete);
      txContext.finish();

      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      assertTrue(result.isEmpty());

      // test column delete
      // load 10 rows
      txContext.start();
      int rowCount = 10;
      for (int i = 0; i < rowCount; i++) {
        Put p = new Put(Bytes.toBytes("row" + i));
        for (int j = 0; j < 10; j++) {
          p.add(TestBytes.family, Bytes.toBytes(j), TestBytes.value);
        }
        txTable.put(p);
      }
      txContext.finish();

      // verify loaded rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("row" + i));
        Result r = txTable.get(g);
        assertFalse(r.isEmpty());
        for (int j = 0; j < 10; j++) {
          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, Bytes.toBytes(j)));
        }
      }
      txContext.finish();

      // delete odds columns from odd rows and even columns from even rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Delete d = new Delete(Bytes.toBytes("row" + i));
        for (int j = 0; j < 10; j++) {
          if (i % 2 == j % 2) {
            LOG.info("Deleting row={}, column={}", i, j);
            d.deleteColumns(TestBytes.family, Bytes.toBytes(j));
          }
        }
        txTable.delete(d);
      }
      txContext.finish();

      // verify deleted columns
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("row" + i));
        Result r = txTable.get(g);
        assertEquals(5, r.size());
        for (Map.Entry<byte[], byte[]> entry : r.getFamilyMap(TestBytes.family).entrySet()) {
          int col = Bytes.toInt(entry.getKey());
          LOG.info("Got row={}, col={}", i, col);
          // each row should only have the opposite mod (odd=even, even=odd)
          assertNotEquals(i % 2, col % 2);
          assertArrayEquals(TestBytes.value, entry.getValue());
        }
      }
      txContext.finish();

      // test family delete
      // load 10 rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Put p = new Put(Bytes.toBytes("famrow" + i));
        p.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
        p.add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2);
        txTable.put(p);
      }
      txContext.finish();

      // verify all loaded rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("famrow" + i));
        Result r = txTable.get(g);
        assertEquals(2, r.size());
        assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
        assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
      }
      txContext.finish();

      // delete family1 for even rows, family2 for odd rows
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Delete d = new Delete(Bytes.toBytes("famrow" + i));
        d.deleteFamily((i % 2 == 0) ? TestBytes.family : TestBytes.family2);
        txTable.delete(d);
      }
      txContext.finish();

      // verify deleted families
      txContext.start();
      for (int i = 0; i < rowCount; i++) {
        Get g = new Get(Bytes.toBytes("famrow" + i));
        Result r = txTable.get(g);
        assertEquals(1, r.size());
        if (i % 2 == 0) {
          assertNull(r.getValue(TestBytes.family, TestBytes.qualifier));
          assertArrayEquals(TestBytes.value2, r.getValue(TestBytes.family2, TestBytes.qualifier2));
        } else {
          assertArrayEquals(TestBytes.value, r.getValue(TestBytes.family, TestBytes.qualifier));
          assertNull(r.getValue(TestBytes.family2, TestBytes.qualifier2));
        }
      }
      txContext.finish();
    } finally {
      hTable.close();
    }
  }

  /**
   * Test that put and delete attributes are preserved
   *
   * @throws Exception
   */
  @Test
  public void testAttributesPreserved() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
        new byte[][]{TestBytes.family, TestBytes.family2}, false,
        Lists.newArrayList(TestRegionObserver.class.getName()));
    try {
      TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

      txContext.start();
      Put put = new Put(TestBytes.row);
      put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
      put.add(TestBytes.family2, TestBytes.qualifier, TestBytes.value2);
      // set an attribute on the put, TestRegionObserver will verify it still exists
      put.setAttribute(TEST_ATTRIBUTE, new byte[]{});
      txTable.put(put);
      txContext.finish();

      txContext.start();
      Result result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value, value);
      value = result.getValue(TestBytes.family2, TestBytes.qualifier);
      assertArrayEquals(TestBytes.value2, value);

      // test full row delete, TestRegionObserver will verify it still exists
      txContext.start();
      Delete delete = new Delete(TestBytes.row);
      delete.setAttribute(TEST_ATTRIBUTE, new byte[]{});
      txTable.delete(delete);
      txContext.finish();

      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      txContext.finish();
      assertTrue(result.isEmpty());
    } finally {
        hTable.close();
      }
    }
  
  /**
   * Test aborted transactional delete requests, that must be rolled back.
   *
   * @throws Exception
   */
  @Test
  public void testAbortedTransactionalDelete() throws Exception {
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    transactionContext.start();
    Result result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);

    transactionContext.start();
    Delete delete = new Delete(TestBytes.row);
    transactionAwareHTable.delete(delete);
    transactionContext.abort();

    transactionContext.start();
    result = transactionAwareHTable.get(new Get(TestBytes.row));
    transactionContext.finish();
    value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);
  }

  private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
    String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
    HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
      txContext.start();
      txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
      txContext.finish();

      // Start a tx, delete the row and then abort the tx
      txContext.start();
      txTable.delete(new Delete(TestBytes.row));
      txContext.abort();

      // Start a tx, delete a column family and then abort the tx
      txContext.start();
      txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
      txContext.abort();

      // Above operations should have no effect on the row, since they were aborted
      txContext.start();
      Get get = new Get(TestBytes.row);
      Result result = txTable.get(get);
      assertFalse(result.isEmpty());
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
      txContext.finish();
    }
  }

  @Test
  public void testDeleteRollback() throws Exception {
    testDeleteRollback(TxConstants.ConflictDetection.ROW);
    testDeleteRollback(TxConstants.ConflictDetection.COLUMN);
    testDeleteRollback(TxConstants.ConflictDetection.NONE);
  }

  @Test
  public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
      txContext.start();
      txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
      txContext.finish();

      txContext.start();
      //noinspection ConstantConditions
      txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
      Result result = txTable.get(new Get(TestBytes.row));
      Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
      Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
      txContext.finish();

      //Start a tx, delete the row and then abort the tx
      txContext.start();
      txTable.delete(new Delete(TestBytes.row));
      txContext.abort();

      //Start a tx and scan all the col families to make sure none of them have delete markers
      txContext.start();
      txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
      result = txTable.get(new Get(TestBytes.row));
      Assert.assertEquals(1, result.getFamilyMap(TestBytes.family).size());
      Assert.assertEquals(0, result.getFamilyMap(TestBytes.family2).size());
      txContext.finish();
    }
  }

  @Test
  public void testRowDelete() throws Exception {
    HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
    TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW);
    try {
      TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

      // Test 1: full row delete
      txContext.start();
      txTable.put(new Put(TestBytes.row)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      Get get = new Get(TestBytes.row);
      Result result = txTable.get(get);
      assertFalse(result.isEmpty());
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier2));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family2, TestBytes.qualifier2));
      txContext.finish();

      // delete entire row
      txContext.start();
      txTable.delete(new Delete(TestBytes.row));
      txContext.finish();

      // verify row is now empty
      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      assertTrue(result.isEmpty());

      // verify row is empty for explicit column retrieval
      result = txTable.get(new Get(TestBytes.row)
          .addColumn(TestBytes.family, TestBytes.qualifier)
          .addFamily(TestBytes.family2));
      assertTrue(result.isEmpty());

      // verify row is empty for scan
      ResultScanner scanner = txTable.getScanner(new Scan(TestBytes.row));
      assertNull(scanner.next());
      scanner.close();

      // verify row is empty for scan with explicit column
      scanner = txTable.getScanner(new Scan(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2));
      assertNull(scanner.next());
      scanner.close();
      txContext.finish();

      // write swapped values to one column per family
      txContext.start();
      txTable.put(new Put(TestBytes.row)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value));
      txContext.finish();

      // verify new values appear
      txContext.start();
      result = txTable.get(new Get(TestBytes.row));
      assertFalse(result.isEmpty());
      assertEquals(2, result.size());
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));

      scanner = txTable.getScanner(new Scan(TestBytes.row));
      Result result1 = scanner.next();
      assertNotNull(result1);
      assertFalse(result1.isEmpty());
      assertEquals(2, result1.size());
      assertArrayEquals(TestBytes.value2, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family2, TestBytes.qualifier2));
      scanner.close();
      txContext.finish();

      // Test 2: delete of first column family
      txContext.start();
      txTable.put(new Put(TestBytes.row2)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
      txContext.finish();

      txContext.start();
      Result fam1Result = txTable.get(new Get(TestBytes.row2));
      assertFalse(fam1Result.isEmpty());
      assertEquals(2, fam1Result.size());
      assertArrayEquals(TestBytes.value, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, fam1Result.getValue(TestBytes.family2, TestBytes.qualifier2));
      txContext.finish();

      // Test 3: delete of second column family
      txContext.start();
      txTable.put(new Put(TestBytes.row3)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2)
          .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txContext.finish();

      txContext.start();
      txTable.delete(new Delete(TestBytes.row3).deleteFamily(TestBytes.family2));
      txContext.finish();

      txContext.start();
      Result fam2Result = txTable.get(new Get(TestBytes.row3));
      assertFalse(fam2Result.isEmpty());
      assertEquals(2, fam2Result.size());
      assertArrayEquals(TestBytes.value, fam2Result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(TestBytes.value2, fam2Result.getValue(TestBytes.family, TestBytes.qualifier2));
      txContext.finish();

      // Test 4: delete specific rows in a range
      txContext.start();
      for (int i = 0; i < 10; i++) {
        txTable.put(new Put(Bytes.toBytes("z" + i))
            .add(TestBytes.family, TestBytes.qualifier, Bytes.toBytes(i))
            .add(TestBytes.family2, TestBytes.qualifier2, Bytes.toBytes(i)));
      }
      txContext.finish();

      txContext.start();
      // delete odd rows
      for (int i = 1; i < 10; i += 2) {
        txTable.delete(new Delete(Bytes.toBytes("z" + i)));
      }
      txContext.finish();

      txContext.start();
      int cnt = 0;
      ResultScanner zScanner = txTable.getScanner(new Scan(Bytes.toBytes("z0")));
      Result res;
      while ((res = zScanner.next()) != null) {
        assertFalse(res.isEmpty());
        assertArrayEquals(Bytes.toBytes("z" + cnt), res.getRow());
        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family, TestBytes.qualifier));
        assertArrayEquals(Bytes.toBytes(cnt), res.getValue(TestBytes.family2, TestBytes.qualifier2));
        cnt += 2;
      }

      // Test 5: delete prior writes in the same transaction
      txContext.start();
      txTable.put(new Put(TestBytes.row4)
          .add(TestBytes.family, TestBytes.qualifier, TestBytes.value)
          .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
      txTable.delete(new Delete(TestBytes.row4));
      txContext.finish();

      txContext.start();
      Result row4Result = txTable.get(new Get(TestBytes.row4));
      assertTrue(row4Result.isEmpty());
      txContext.finish();
    } finally {
      txTable.close();
    }
  }

  /**
   * Expect an exception since a transaction hasn't been started.
   *
   * @throws Exception
   */
  @Test(expected = IOException.class)
  public void testTransactionlessFailure() throws Exception {
    transactionAwareHTable.get(new Get(TestBytes.row));
  }

  /**
   * Tests that each transaction can see its own persisted writes, while not seeing writes from other
   * in-progress transactions.
   */
  @Test
  public void testReadYourWrites() throws Exception {
    // In-progress tx1: started before our main transaction
    HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
    TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
    TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);

    // In-progress tx2: started while our main transaction is running
    HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
    TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
    TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);

    // create an in-progress write that should be ignored
    byte[] col2 = Bytes.toBytes("col2");
    inprogressTxContext1.start();
    Put putCol2 = new Put(TestBytes.row);
    byte[] valueCol2 = Bytes.toBytes("writing in progress");
    putCol2.add(TestBytes.family, col2, valueCol2);
    txHTable1.put(putCol2);

    // start a tx and write a value to test reading in same tx
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    byte[] value = Bytes.toBytes("writing");
    put.add(TestBytes.family, TestBytes.qualifier, value);
    transactionAwareHTable.put(put);

    // test that a write from a tx started after the first is not visible
    inprogressTxContext2.start();
    Put put2 = new Put(TestBytes.row);
    byte[] value2 = Bytes.toBytes("writing2");
    put2.add(TestBytes.family, TestBytes.qualifier, value2);
    txHTable2.put(put2);

    Get get = new Get(TestBytes.row);
    Result row = transactionAwareHTable.get(get);
    assertFalse(row.isEmpty());
    byte[] col1Value = row.getValue(TestBytes.family, TestBytes.qualifier);
    Assert.assertNotNull(col1Value);
    Assert.assertArrayEquals(value, col1Value);
    // write from in-progress transaction should not be visible
    byte[] col2Value = row.getValue(TestBytes.family, col2);
    assertNull(col2Value);

    // commit in-progress transaction, should still not be visible
    inprogressTxContext1.finish();

    get = new Get(TestBytes.row);
    row = transactionAwareHTable.get(get);
    assertFalse(row.isEmpty());
    col2Value = row.getValue(TestBytes.family, col2);
    assertNull(col2Value);

    transactionContext.finish();

    inprogressTxContext2.abort();
  }

  @Test
  public void testRowLevelConflictDetection() throws Exception {
    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.ROW);
    TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);

    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.ROW);
    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);

    byte[] row1 = Bytes.toBytes("row1");
    byte[] row2 = Bytes.toBytes("row2");
    byte[] col1 = Bytes.toBytes("c1");
    byte[] col2 = Bytes.toBytes("c2");
    byte[] val1 = Bytes.toBytes("val1");
    byte[] val2 = Bytes.toBytes("val2");

    // test that concurrent writing to different rows succeeds
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));

    txContext2.start();
    txTable2.put(new Put(row2).add(TestBytes.family, col1, val2));

    // should be no conflicts
    txContext1.finish();
    txContext2.finish();

    transactionContext.start();
    Result res = transactionAwareHTable.get(new Get(row1));
    assertFalse(res.isEmpty());
    Cell cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    assertArrayEquals(val1, CellUtil.cloneValue(cell));

    res = transactionAwareHTable.get(new Get(row2));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    assertArrayEquals(val2, CellUtil.cloneValue(cell));
    transactionContext.finish();

    // test that writing to different columns in the same row fails
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val2));

    txContext2.start();
    txTable2.put(new Put(row1).add(TestBytes.family, col2, val2));

    txContext1.finish();
    try {
      txContext2.finish();
      fail("txContext2 should have encountered a row-level conflict during commit");
    } catch (TransactionConflictException tce) {
      txContext2.abort();
    }

    transactionContext.start();
    res = transactionAwareHTable.get(new Get(row1));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col1);
    assertNotNull(cell);
    // should now be val2
    assertArrayEquals(val2, CellUtil.cloneValue(cell));

    cell = res.getColumnLatestCell(TestBytes.family, col2);
    // col2 should not be visible due to conflict
    assertNull(cell);
    transactionContext.finish();

    // test that writing to the same column in the same row fails
    txContext1.start();
    txTable1.put(new Put(row2).add(TestBytes.family, col2, val1));

    txContext2.start();
    txTable2.put(new Put(row2).add(TestBytes.family, col2, val2));

    txContext1.finish();
    try {
      txContext2.finish();
      fail("txContext2 should have encountered a row and column level conflict during commit");
    } catch (TransactionConflictException tce) {
      txContext2.abort();
    }

    transactionContext.start();
    res = transactionAwareHTable.get(new Get(row2));
    assertFalse(res.isEmpty());
    cell = res.getColumnLatestCell(TestBytes.family, col2);
    assertNotNull(cell);
    // should now be val1
    assertArrayEquals(val1, CellUtil.cloneValue(cell));
    transactionContext.finish();

    // verify change set that is being reported only on rows
    txContext1.start();
    txTable1.put(new Put(row1).add(TestBytes.family, col1, val1));
    txTable1.put(new Put(row2).add(TestBytes.family, col2, val2));

    Collection<byte[]> changeSet = txTable1.getTxChanges();
    assertNotNull(changeSet);
    assertEquals(2, changeSet.size());
    assertTrue(changeSet.contains(txTable1.getChangeKey(row1, null, null)));
    assertTrue(changeSet.contains(txTable1.getChangeKey(row2, null, null)));
    txContext1.finish();
  }

  @Test
  public void testNoneLevelConflictDetection() throws Exception {
    InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.NONE);
    TransactionContext txContext1 = new TransactionContext(txClient, txTable1);

    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
        TxConstants.ConflictDetection.NONE);
    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);

    // overlapping writes to the same row + column should not conflict

    txContext1.start();
    txTable1.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    // changes should not be visible yet
    txContext2.start();
    Result row = txTable2.get(new Get(TestBytes.row));
    assertTrue(row.isEmpty());

    txTable2.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));

    // both commits should succeed
    txContext1.finish();
    txContext2.finish();

    txContext1.start();
    row = txTable1.get(new Get(TestBytes.row));
    assertFalse(row.isEmpty());
    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));
    txContext1.finish();

    // transaction abort should still rollback changes

    txContext1.start();
    txTable1.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    txContext1.abort();

    // changes to row2 should be rolled back
    txContext2.start();
    Result row2 = txTable2.get(new Get(TestBytes.row2));
    assertTrue(row2.isEmpty());
    txContext2.finish();

    // transaction invalidate should still make changes invisible

    txContext1.start();
    Transaction tx1 = txContext1.getCurrentTransaction();
    txTable1.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    assertNotNull(tx1);
    txClient.invalidate(tx1.getWritePointer());

    // changes to row2 should be rolled back
    txContext2.start();
    Result row3 = txTable2.get(new Get(TestBytes.row3));
    assertTrue(row3.isEmpty());
    txContext2.finish();
  }

  @Test
  public void testCheckpoint() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    Transaction origTx = transactionContext.getCurrentTransaction();
    transactionContext.checkpoint();
    Transaction postCheckpointTx = transactionContext.getCurrentTransaction();

    assertEquals(origTx.getTransactionId(), postCheckpointTx.getTransactionId());
    assertNotEquals(origTx.getWritePointer(), postCheckpointTx.getWritePointer());
    long[] checkpointPtrs = postCheckpointTx.getCheckpointWritePointers();
    assertEquals(1, checkpointPtrs.length);
    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs[0]);

    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    Transaction postCheckpointTx2 = transactionContext.getCurrentTransaction();

    assertEquals(origTx.getTransactionId(), postCheckpointTx2.getTransactionId());
    assertNotEquals(postCheckpointTx.getWritePointer(), postCheckpointTx2.getWritePointer());
    long[] checkpointPtrs2 = postCheckpointTx2.getCheckpointWritePointers();
    assertEquals(2, checkpointPtrs2.length);
    assertEquals(postCheckpointTx.getWritePointer(), checkpointPtrs2[0]);
    assertEquals(postCheckpointTx2.getWritePointer(), checkpointPtrs2[1]);

    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    // by default, all rows should be visible with Read-Your-Writes
    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
    verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);

    // when disabling current write pointer, only the previous checkpoints should be visible
    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    Get get = new Get(TestBytes.row);
    verifyRow(transactionAwareHTable, get, TestBytes.value);
    get = new Get(TestBytes.row2);
    verifyRow(transactionAwareHTable, get, TestBytes.value2);
    get = new Get(TestBytes.row3);
    verifyRow(transactionAwareHTable, get, null);

    // test scan results excluding current write pointer
    Scan scan = new Scan();
    ResultScanner scanner = transactionAwareHTable.getScanner(scan);

    Result row = scanner.next();
    assertNotNull(row);
    assertArrayEquals(TestBytes.row, row.getRow());
    assertEquals(1, row.size());
    assertArrayEquals(TestBytes.value, row.getValue(TestBytes.family, TestBytes.qualifier));

    row = scanner.next();
    assertNotNull(row);
    assertArrayEquals(TestBytes.row2, row.getRow());
    assertEquals(1, row.size());
    assertArrayEquals(TestBytes.value2, row.getValue(TestBytes.family, TestBytes.qualifier));

    row = scanner.next();
    assertNull(row);
    scanner.close();
    transactionContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);

    // commit transaction, verify writes are visible
    transactionContext.finish();

    transactionContext.start();
    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
    verifyRow(transactionAwareHTable, TestBytes.row3, TestBytes.value);
    transactionContext.finish();
  }

  @Test
  public void testInProgressCheckpoint() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    transactionContext.checkpoint();
    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));

    // check that writes are still not visible to other clients
    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
    TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);

    txContext2.start();
    verifyRow(txTable2, TestBytes.row, null);
    verifyRow(txTable2, TestBytes.row2, null);
    txContext2.finish();
    txTable2.close();

    transactionContext.finish();

    // verify writes are visible after commit
    transactionContext.start();
    verifyRow(transactionAwareHTable, TestBytes.row, TestBytes.value);
    verifyRow(transactionAwareHTable, TestBytes.row2, TestBytes.value2);
    transactionContext.finish();
  }

  @Test
  public void testCheckpointRollback() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    transactionContext.checkpoint();
    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    transactionContext.abort();

    transactionContext.start();
    verifyRow(transactionAwareHTable, TestBytes.row, null);
    verifyRow(transactionAwareHTable, TestBytes.row2, null);
    verifyRow(transactionAwareHTable, TestBytes.row3, null);

    Scan scan = new Scan();
    ResultScanner scanner = transactionAwareHTable.getScanner(scan);
    assertNull(scanner.next());
    scanner.close();
    transactionContext.finish();
  }

  @Test
  public void testCheckpointInvalidate() throws Exception {
    // start a transaction, using checkpoints between writes
    transactionContext.start();
    Transaction origTx = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    transactionContext.checkpoint();
    Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
    transactionContext.checkpoint();
    Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
    transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));

    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
    txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());

    // check that writes are not visible
    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
    TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
    txContext2.start();
    Transaction newTx = txContext2.getCurrentTransaction();

    // all 3 writes pointers from the previous transaction should now be excluded
    assertTrue(newTx.isExcluded(origTx.getWritePointer()));
    assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
    assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));

    verifyRow(txTable2, TestBytes.row, null);
    verifyRow(txTable2, TestBytes.row2, null);
    verifyRow(txTable2, TestBytes.row3, null);

    Scan scan = new Scan();
    ResultScanner scanner = txTable2.getScanner(scan);
    assertNull(scanner.next());
    scanner.close();
    txContext2.finish();
  }

  @Test
  public void testExistingData() throws Exception {
    byte[] val11 = Bytes.toBytes("val11");
    byte[] val12 = Bytes.toBytes("val12");
    byte[] val21 = Bytes.toBytes("val21");
    byte[] val22 = Bytes.toBytes("val22");
    byte[] val31 = Bytes.toBytes("val31");
    byte[] val111 = Bytes.toBytes("val111");

    TransactionAwareHTable txTable =
      new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, 
      Collections.<String>emptyList()));
    TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

    // Add some pre-existing, non-transactional data
    HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
    nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
    nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
    nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
    nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, val22));
    nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
                                               HConstants.EMPTY_BYTE_ARRAY));
    nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
    nonTxTable.flushCommits();

    // Add transactional data
    txContext.start();
    txTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, val31));
    txContext.finish();

    txContext.start();
    // test get
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val11);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), val21);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
    verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
              HConstants.EMPTY_BYTE_ARRAY);
    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
              HConstants.EMPTY_BYTE_ARRAY);

    // test scan
    try (ResultScanner scanner = txTable.getScanner(new Scan())) {
      Result result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row, result.getRow());
      assertArrayEquals(val11, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row2, result.getRow());
      assertArrayEquals(val21, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row3, result.getRow());
      assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row4, result.getRow());
      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
                                                                     TxConstants.FAMILY_DELETE_QUALIFIER));
      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertNull(scanner.next());
    }
    txContext.finish();

    // test update and delete
    txContext.start();
    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val111));
    txTable.delete(new Delete(TestBytes.row2).deleteColumns(TestBytes.family, TestBytes.qualifier));
    txContext.finish();

    txContext.start();
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), val111);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), val12);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), val22);
    verifyRow(txTable, new Get(TestBytes.row3).addColumn(TestBytes.family, TestBytes.qualifier), val31);
    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER),
              HConstants.EMPTY_BYTE_ARRAY);
    verifyRow(txTable, new Get(TestBytes.row4).addColumn(TestBytes.family, TestBytes.qualifier),
              HConstants.EMPTY_BYTE_ARRAY);
    txContext.finish();

    // test scan
    txContext.start();
    try (ResultScanner scanner = txTable.getScanner(new Scan())) {
      Result result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row, result.getRow());
      assertArrayEquals(val111, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(val12, result.getValue(TestBytes.family, TestBytes.qualifier2));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row2, result.getRow());
      assertArrayEquals(null, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertArrayEquals(val22, result.getValue(TestBytes.family, TestBytes.qualifier2));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row3, result.getRow());
      assertArrayEquals(val31, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row4, result.getRow());
      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family,
                                                                     TxConstants.FAMILY_DELETE_QUALIFIER));
      assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, result.getValue(TestBytes.family, TestBytes.qualifier));
      assertNull(scanner.next());
    }
    txContext.finish();
  }

  private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
    verifyRow(table, new Get(rowkey), expectedValue);
  }

  private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
    verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
  }

  private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
    Result result = table.get(get);
    if (expectedValues == null) {
      assertTrue(result.isEmpty());
    } else {
      assertFalse(result.isEmpty());
      byte[] family = TestBytes.family;
      byte[] col = TestBytes.qualifier;
      if (get.hasFamilies()) {
        family = get.getFamilyMap().keySet().iterator().next();
        col = get.getFamilyMap().get(family).first();
      }
      Iterator<Cell> it = result.getColumnCells(family, col).iterator();
      for (byte[] expectedValue : expectedValues) {
        Assert.assertTrue(it.hasNext());
        assertArrayEquals(expectedValue, CellUtil.cloneValue(it.next()));
      }
    }
  }

  private Cell[] getRow(HTableInterface table, Get get) throws Exception {
    Result result = table.get(get);
    return result.rawCells();
  }

  private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
    List<Cell> actualCells = new ArrayList<>();
    try (ResultScanner scanner = table.getScanner(scan)) {
      Result[] results = scanner.next(expectedCells.size() + 1);
      for (Result result : results) {
        actualCells.addAll(Lists.newArrayList(result.rawCells()));
      }
      Assert.assertEquals(expectedCells, actualCells);
    }
  }

  @Test
  public void testVisibilityAll() throws Exception {
    HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"),
      new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList());
    TransactionAwareHTable txTable =
      new TransactionAwareHTable(nonTxTable,
                                 TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes
    TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);

    // start a transaction and create a delete marker
    txContext.start();
    //noinspection ConstantConditions
    long txWp0 = txContext.getCurrentTransaction().getWritePointer();
    txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2));
    txContext.finish();

    // start a new transaction and write some values
    txContext.start();
    @SuppressWarnings("ConstantConditions")
    long txWp1 = txContext.getCurrentTransaction().getWritePointer();
    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
    txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
    txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier, TestBytes.value));
    txTable.put(new Put(TestBytes.row).add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));

    // verify written data
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
              TestBytes.value);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
              TestBytes.value2);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
              TestBytes.value);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
              TestBytes.value);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
              TestBytes.value2);

    // checkpoint and make changes to written data now
    txContext.checkpoint();
    long txWp2 = txContext.getCurrentTransaction().getWritePointer();
    // delete a column
    txTable.delete(new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier));
    // no change to a column
    txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value2));
    // update a column
    txTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value3));
    // delete column family
    txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family2));

    // verify changed values
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
              null);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
              TestBytes.value2);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
              TestBytes.value3);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier),
              null);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2),
              null);

    // run a scan with VisibilityLevel.ALL, this should return all raw changes by this transaction,
    // and the raw change by prior transaction
    //noinspection ConstantConditions
    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
    List<KeyValue> expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp0, KeyValue.Type.DeleteColumn),
      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
    );
    verifyScan(txTable, new Scan(), expected);

    // verify a Get is also able to return all snapshot versions
    Get get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier);
    Cell[] cells = getRow(txTable, get);
    Assert.assertEquals(2, cells.length);
    Assert.assertTrue(CellUtil.isDelete(cells[0]));
    Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));

    get = new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2);
    cells = getRow(txTable, get);
    Assert.assertEquals(3, cells.length);
    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[0]));
    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));
    Assert.assertTrue(CellUtil.isDeleteColumns(cells[2]));

    verifyRows(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
               ImmutableList.of(TestBytes.value3, TestBytes.value));

    get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier);
    cells = getRow(txTable, get);
    Assert.assertEquals(2, cells.length);
    Assert.assertTrue(CellUtil.isDelete(cells[0]));
    Assert.assertArrayEquals(TestBytes.value, CellUtil.cloneValue(cells[1]));

    get = new Get(TestBytes.row).addColumn(TestBytes.family2, TestBytes.qualifier2);
    cells = getRow(txTable, get);
    Assert.assertEquals(2, cells.length);
    Assert.assertTrue(CellUtil.isDelete(cells[0]));
    Assert.assertArrayEquals(TestBytes.value2, CellUtil.cloneValue(cells[1]));

    // Verify VisibilityLevel.SNAPSHOT
    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
    );
    verifyScan(txTable, new Scan(), expected);

    // Verify VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT
    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp1, TestBytes.value2),
      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier, txWp1, TestBytes.value),
      new KeyValue(TestBytes.row, TestBytes.family2, TestBytes.qualifier2, txWp1, TestBytes.value2),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp1, TestBytes.value)
    );
    verifyScan(txTable, new Scan(), expected);
    txContext.finish();

    // finally verify values once more after commit, this time we should get only committed raw values for
    // all visibility levels
    txContext.start();
    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
    );
    verifyScan(txTable, new Scan(), expected);

    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
    );
    verifyScan(txTable, new Scan(), expected);

    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3)
    );
    verifyScan(txTable, new Scan(), expected);

    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier),
              null);
    verifyRow(txTable, new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2),
              TestBytes.value2);
    verifyRow(txTable, new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier),
              TestBytes.value3);
    txContext.finish();

    // Test with regular HBase deletes in pre-existing data
    long now = System.currentTimeMillis();
    Delete deleteColumn = new Delete(TestBytes.row3).deleteColumn(TestBytes.family, TestBytes.qualifier, now - 1);
    // to prevent Tephra from replacing delete with delete marker
    deleteColumn.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
    nonTxTable.delete(deleteColumn);
    Delete deleteFamily = new Delete(TestBytes.row3).deleteFamily(TestBytes.family2, now);
    // to prevent Tephra from replacing delete with delete marker
    deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
    nonTxTable.delete(deleteFamily);
    nonTxTable.flushCommits();

    txContext.start();
    txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
    expected = ImmutableList.of(
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier, txWp2, KeyValue.Type.DeleteColumn),
      new KeyValue(TestBytes.row, TestBytes.family, TestBytes.qualifier2, txWp2, TestBytes.value2),
      new KeyValue(TestBytes.row, TestBytes.family2, null, txWp2, KeyValue.Type.DeleteFamily),
      new KeyValue(TestBytes.row2, TestBytes.family, TestBytes.qualifier, txWp2, TestBytes.value3),
      new KeyValue(TestBytes.row3, TestBytes.family, TestBytes.qualifier, now - 1, KeyValue.Type.Delete),
      new KeyValue(TestBytes.row3, TestBytes.family2, null, now, KeyValue.Type.DeleteFamily)
    );
    // test scan
    Scan scan = new Scan();
    scan.setRaw(true);
    verifyScan(txTable, scan, expected);
    txContext.finish();
  }

  @Test
  public void testFilters() throws Exception {
    // Add some values to table
    transactionContext.start();
    Put put = new Put(TestBytes.row);
    byte[] val1 = Bytes.toBytes(1L);
    put.add(TestBytes.family, TestBytes.qualifier, val1);
    transactionAwareHTable.put(put);
    put = new Put(TestBytes.row2);
    byte[] val2 = Bytes.toBytes(2L);
    put.add(TestBytes.family, TestBytes.qualifier, val2);
    transactionAwareHTable.put(put);
    put = new Put(TestBytes.row3);
    byte[] val3 = Bytes.toBytes(3L);
    put.add(TestBytes.family, TestBytes.qualifier, val3);
    transactionAwareHTable.put(put);
    put = new Put(TestBytes.row4);
    byte[] val4 = Bytes.toBytes(4L);
    put.add(TestBytes.family, TestBytes.qualifier, val4);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    // Delete cell with value 2
    transactionContext.start();
    Delete delete = new Delete(TestBytes.row2);
    delete.addColumn(TestBytes.family, TestBytes.qualifier);
    transactionAwareHTable.delete(delete);
    transactionContext.finish();

    // Scan for values less than 4, should get only values 1 and 3
    transactionContext.start();
    Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(4)));
    try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
      Result result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row, result.getRow());
      assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row3, result.getRow());
      assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNull(result);
    }
    transactionContext.finish();

    // Run a Get with a filter for less than 10 on row4, should get value 4
    transactionContext.start();
    Get get = new Get(TestBytes.row4);
    get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
    Result result = transactionAwareHTable.get(get);
    assertFalse(result.isEmpty());
    assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier));
    transactionContext.finish();

    // Change value of row4 to 40
    transactionContext.start();
    put = new Put(TestBytes.row4);
    byte[] val40 = Bytes.toBytes(40L);
    put.add(TestBytes.family, TestBytes.qualifier, val40);
    transactionAwareHTable.put(put);
    transactionContext.finish();

    // Scan for values less than 10, should get only values 1 and 3
    transactionContext.start();
    scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS, new LongComparator(10)));
    try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row, result.getRow());
      assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNotNull(result);
      assertArrayEquals(TestBytes.row3, result.getRow());
      assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
      result = scanner.next();
      assertNull(result);
    }
    transactionContext.finish();

    // Run the Get again with a filter for less than 10 on row4, this time should not get any results
    transactionContext.start();
    result = transactionAwareHTable.get(get);
    assertTrue(result.isEmpty());
    transactionContext.finish();
  }

  /**
   * Tests that transaction co-processor works with older clients
   *
   * @throws Exception
   */
  @Test
  public void testOlderClientOperations() throws Exception {
    // Use old HTable to test
    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
    transactionContext.addTransactionAware(oldTxAware);

    transactionContext.start();
    Put put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
    oldTxAware.put(put);
    transactionContext.finish();

    transactionContext.start();
    long txId = transactionContext.getCurrentTransaction().getTransactionId();
    put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
    oldTxAware.put(put);
    // Invalidate the second Put
    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
    txClient.invalidate(txId);

    transactionContext.start();
    put = new Put(TestBytes.row);
    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
    oldTxAware.put(put);
    // Abort the third Put
    transactionContext.abort();

    // Get should now return the first value
    transactionContext.start();
    Result result = oldTxAware.get(new Get(TestBytes.row));
    transactionContext.finish();

    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
    assertArrayEquals(TestBytes.value, value);
  }

  /**
   * Represents older transaction clients
   */
  private static class OldTransactionAwareHTable extends TransactionAwareHTable {
    public OldTransactionAwareHTable(HTableInterface hTable) {
      super(hTable);
    }

    @Override
    public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
    }

    @Override
    protected void makeRollbackOperation(Delete delete) {
      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
    }
  }
}
