/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.omid.transaction;

import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.util.Map;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
import org.testng.ITestContext;
import org.testng.annotations.Test;

import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;

@Test(groups = "sharedHBase")
public class TestHBaseTransactionClient extends OmidTestBase {

    private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
    private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
    private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
    private static final byte[] qualifier = Bytes.toBytes("testdata");
    private static final byte[] data1 = Bytes.toBytes("testWrite-1");

    @Test(timeOut = 30_000)
    public void testIsCommitted(ITestContext context) throws Exception {
        TransactionManager tm = newTransactionManager(context);
        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                ((AbstractTransactionManager)tm).getCommitTableClient());
        TTable table = spy(new TTable(htable, snapshotFilter, false));

        HBaseTransaction t1 = (HBaseTransaction) tm.begin();

        Put put = new Put(row1);
        put.addColumn(family, qualifier, data1);
        table.put(t1, put);
        tm.commit(t1);

        HBaseTransaction t2 = (HBaseTransaction) tm.begin();
        put = new Put(row2);
        put.addColumn(family, qualifier, data1);
        table.put(t2, put);
        table.flushCommits();

        HBaseTransaction t3 = (HBaseTransaction) tm.begin();
        put = new Put(row2);
        put.addColumn(family, qualifier, data1);
        table.put(t3, put);
        tm.commit(t3);

        HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
        HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
        HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());

        assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
        assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
        assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
    }

    @Test(timeOut = 30_000)
    public void testCrashAfterCommit(ITestContext context) throws Exception {
        PostCommitActions syncPostCommitter =
                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
        AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
        // The following line emulates a crash after commit that is observed in (*) below
        doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());
        TTable table = spy(new TTable(htable, snapshotFilter, false));

        HBaseTransaction t1 = (HBaseTransaction) tm.begin();

        // Test shadow cell are created properly
        Put put = new Put(row1);
        put.addColumn(family, qualifier, data1);
        table.put(t1, put);
        try {
            tm.commit(t1);
        } catch (Exception e) { // (*) crash
            // Do nothing
        }

        assertTrue(CellUtils.hasCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
                   "Cell should be there");
        assertFalse(CellUtils.hasShadowCell(row1, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
                    "Shadow cell should not be there");

        HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());

        HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
        assertTrue(snapshotFilter.isCommitted(hBaseCellId, 0, false), "row1 should be committed");
    }

    @Test(timeOut = 30_000)
    public void testReadCommitTimestampFromCommitTable(ITestContext context) throws Exception {

        //connection = ConnectionFactory.createConnection(hbaseConf);
        final long NON_EXISTING_CELL_TS = 1000L;

        PostCommitActions syncPostCommitter =
                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
        AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
        // The following line emulates a crash after commit that is observed in (*) below
        doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));

        // Test that a non-existing cell timestamp returns an empty result
        Optional<CommitTimestamp> optionalCT = tm.commitTableClient.getCommitTimestamp(NON_EXISTING_CELL_TS).get();
        assertFalse(optionalCT.isPresent());

        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
            // Test that we get an invalidation mark for an invalidated transaction

            // Start a transaction and invalidate it before commiting it
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);

            assertTrue(tm.commitTableClient.tryInvalidateTransaction(tx1.getStartTimestamp()).get());
            optionalCT = tm.commitTableClient.getCommitTimestamp(tx1.getStartTimestamp()).get();
            assertTrue(optionalCT.isPresent());
            CommitTimestamp ct = optionalCT.get();
            assertFalse(ct.isValid());
            assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
            assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);

            // Finally test that we get the right commit timestamp for a committed tx
            // that couldn't get
            HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
            Put otherPut = new Put(row1);
            otherPut.addColumn(family, qualifier, data1);
            table.put(tx2, otherPut);
            try {
                tm.commit(tx2);
            } catch (Exception e) { // (*) crash
                // Do nothing
            }

            optionalCT = tm.commitTableClient.getCommitTimestamp(tx2.getStartTimestamp()).get();
            assertTrue(optionalCT.isPresent());
            ct = optionalCT.get();
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), tx2.getCommitTimestamp());
            assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
        }
    }

    @Test(timeOut = 30_000)
    public void testReadCommitTimestampFromShadowCell(ITestContext context) throws Exception {

        final long NON_EXISTING_CELL_TS = 1L;

        HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {

            // Test first we can not found a non-existent cell ts
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, NON_EXISTING_CELL_TS);
            // Set an empty cache to allow to bypass the checking
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            Optional<CommitTimestamp> optionalCT = snapshotFilter
                    .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator);
            assertFalse(optionalCT.isPresent());

            // Then test that for a transaction committed, we get the right CT
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            tm.commit(tx1);
            // Upon commit, the commit data should be in the shadow cells, so test it
            optionalCT = snapshotFilter.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
            assertTrue(optionalCT.isPresent());
            CommitTimestamp ct = optionalCT.get();
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), tx1.getCommitTimestamp());
            assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);

        }

    }

    // Tests step 1 in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCellCommitTimestampIsLocatedInCache(ITestContext context) throws Exception {

        final long CELL_ST = 1L;
        final long CELL_CT = 2L;

        HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);

        // Pre-load the element to look for in the cache
        Table htable = hBaseUtils.getConnection().getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());
        TTable table = new TTable(htable, snapshotFilter, false);

        HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_ST);
        Map<Long, Long> fakeCache = Maps.newHashMap();
        fakeCache.put(CELL_ST, CELL_CT);

        // Then test that locator finds it in the cache
        CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache);
        CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator,
                false);
        assertTrue(ct.isValid());
        assertEquals(ct.getValue(), CELL_CT);
        assertTrue(ct.getLocation().compareTo(CACHE) == 0);

    }

    // Tests step 2 in AbstractTransactionManager.locateCellCommitTimestamp()
    // Note: This test is very similar to testCrashAfterCommit() above so
    // maybe we should merge them in this test, adding the missing assertions
    @Test(timeOut = 30_000)
    public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {

        PostCommitActions syncPostCommitter =
                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
        AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
        // The following line emulates a crash after commit that is observed in (*) below
        doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
            // Commit a transaction that is broken on commit to avoid
            // write to the shadow cells and avoid cleaning the commit table
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            try {
                tm.commit(tx1);
            } catch (Exception e) { // (*) crash
                // Do nothing
            }

            // Test the locator finds the appropriate data in the commit table
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
                    tx1.getStartTimestamp());
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
                    ctLocator, false);
            assertTrue(ct.isValid());
            long expectedCommitTS = tx1.getStartTimestamp() + CommitTable.MAX_CHECKPOINTS_PER_TXN;
            assertEquals(ct.getValue(), expectedCommitTS);
            assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
        }

    }

    // Tests step 3 in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCellCommitTimestampIsLocatedInShadowCells(ITestContext context) throws Exception {

        HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
            // Commit a transaction to addColumn ST/CT in commit table
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            tm.commit(tx1);
            // Upon commit, the commit data should be in the shadow cells

            // Test the locator finds the appropriate data in the shadow cells
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
                    tx1.getStartTimestamp());
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
                    ctLocator, false);
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), tx1.getCommitTimestamp());
            assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
        }

    }

    // Tests step 4 in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception {

        final long CURRENT_EPOCH_FAKE = 1000L * CommitTable.MAX_CHECKPOINTS_PER_TXN;

        CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
        AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
        // The following lines allow to reach step 4)
        // in AbstractTransactionManager.locateCellCommitTimestamp()
        SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
        f.set(Optional.<CommitTimestamp>absent());
        doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {

            // Commit a transaction to addColumn ST/CT in commit table
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            // Upon commit, the commit data should be in the shadow cells

            // Test a transaction in the previous epoch gets an InvalidCommitTimestamp class
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
                    tx1.getStartTimestamp());
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            // Fake the current epoch to simulate a newer TSO
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE,
                    ctLocator, false);
            assertFalse(ct.isValid());
            assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
            assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
        }
    }

    // Tests step 5 in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCellCommitTimestampIsLocatedInCommitTableAfterNotBeingInvalidated(ITestContext context) throws Exception {

        CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
        PostCommitActions syncPostCommitter =
                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
        AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));

        // The following line emulates a crash after commit that is observed in (*) below
        doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
        // The next two lines avoid steps 2) and 3) and go directly to step 5)
        // in AbstractTransactionManager.locateCellCommitTimestamp()
        SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
        f.set(Optional.<CommitTimestamp>absent());
        doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {

            // Commit a transaction that is broken on commit to avoid
            // write to the shadow cells and avoid cleaning the commit table
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            try {
                tm.commit(tx1);
            } catch (Exception e) { // (*) crash
                // Do nothing
            }

            // Test the locator finds the appropriate data in the commit table
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
                    tx1.getStartTimestamp());
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
                    ctLocator, false);
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), tx1.getCommitTimestamp());
            assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
        }

    }

    // Tests step 6 in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCellCommitTimestampIsLocatedInShadowCellsAfterNotBeingInvalidated(ITestContext context) throws Exception {

        CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
        AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
        // The next two lines avoid steps 2), 3) and 5) and go directly to step 6)
        // in AbstractTransactionManager.locateCellCommitTimestamp()
        SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
        f.set(Optional.<CommitTimestamp>absent());
        doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {

            // Commit a transaction to addColumn ST/CT in commit table
            HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
            Put put = new Put(row1);
            put.addColumn(family, qualifier, data1);
            table.put(tx1, put);
            tm.commit(tx1);
            // Upon commit, the commit data should be in the shadow cells

            // Test the locator finds the appropriate data in the shadow cells
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier,
                    tx1.getStartTimestamp());
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
                    ctLocator,false);
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), tx1.getCommitTimestamp());
            assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
        }

    }

    // Tests last step in AbstractTransactionManager.locateCellCommitTimestamp()
    @Test(timeOut = 30_000)
    public void testCTLocatorReturnsAValidCTWhenNotPresent(ITestContext context) throws Exception {

        final long CELL_TS = 1L;

        CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
        AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
        // The following lines allow to reach the last return statement
        SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
        f.set(Optional.<CommitTimestamp>absent());
        doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));

        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
                tm.getCommitTableClient());

        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
            HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_TS);
            CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                    Maps.<Long, Long>newHashMap());
            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(),
                    ctLocator, false);
            assertTrue(ct.isValid());
            assertEquals(ct.getValue(), -1L);
            assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0);
        }

    }
}
