PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index a909cf2..5571512 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -133,4 +133,8 @@
     public ReplayWrite getReplayWrite(Mutation m) {
         return null;
     }
+
+    public RegionCoprocessorEnvironment getEnv() {
+        return this.env;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 165f0c9..43cafa0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
 import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -37,6 +36,7 @@
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +49,7 @@
   private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class);
   private final IndexBuilder delegate;
   private boolean stopped;
+  private RegionCoprocessorEnvironment regionCoprocessorEnvironment;
 
   /**
    * @param env environment in which <tt>this</tt> is running. Used to setup the
@@ -59,6 +60,7 @@
     // Prevent deadlock by using single thread for all reads so that we know
     // we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
     this.delegate = getIndexBuilder(env);
+    this.regionCoprocessorEnvironment = env;
   }
   
   private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -88,10 +90,14 @@
       IndexMetaData indexMetaData) throws Throwable {
     // notify the delegate that we have started processing a batch
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
-
+    CachedLocalTable cachedLocalTable =
+            CachedLocalTable.build(
+                    mutations,
+                    (PhoenixIndexMetaData)indexMetaData,
+                    this.regionCoprocessorEnvironment.getRegion());
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
       for (Pair<Mutation, byte[]> update : updates) {
         indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
@@ -105,10 +111,15 @@
     final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
+    CachedLocalTable cachedLocalTable =
+            CachedLocalTable.build(
+                    mutations,
+                    (PhoenixIndexMetaData)indexMetaData,
+                    this.regionCoprocessorEnvironment.getRegion());
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
       if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
         for (Pair<Mutation, byte[]> update : updates) {
           update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index f561e79..7bce22e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -34,6 +34,7 @@
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 
 /**
  * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -72,7 +73,7 @@
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException;
 
     /**
      * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 54d7f87..bf592ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -161,7 +161,7 @@
             // needing to lookup the prior row values.
             if (requiresPriorRowState) {
                 // add the current state of the row. Uses listCells() to avoid a new array creation.
-                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
+                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
             }
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 645f2c4..63d2751 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -20,7 +20,6 @@
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 import org.slf4j.Logger;
@@ -38,18 +37,15 @@
 public class NonTxIndexBuilder extends BaseIndexBuilder {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
 
-    protected LocalHBaseState localTable;
-
     @Override
     public void setup(RegionCoprocessorEnvironment env) throws IOException {
         super.setup(env);
-        this.localTable = new LocalTable(env);
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
     	// create a state manager, so we can manage each batch
-        LocalTableState state = new LocalTableState(localTable, mutation);
+        LocalTableState state = new LocalTableState(localHBaseState, mutation);
         // build the index updates for each group
         IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
new file mode 100644
index 0000000..2fd91f7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import java.util.HashMap;
+
+public class CachedLocalTable implements LocalHBaseState {
+
+    private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells;
+    private final Region region;
+
+    private CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells, Region region) {
+        this.rowKeyPtrToCells = rowKeyPtrToCells;
+        this.region = region;
+    }
+
+    @Override
+    public List<Cell> getCurrentRowState(
+            Mutation mutation,
+            Collection<? extends ColumnReference> columnReferences,
+            boolean ignoreNewerMutations) throws IOException {
+
+        if(ignoreNewerMutations) {
+            return doScan(mutation, columnReferences);
+        }
+
+        byte[] rowKey = mutation.getRow();
+        return this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey));
+    }
+
+    private List<Cell> doScan(Mutation mutation, Collection<? extends ColumnReference> columnReferences) throws IOException {
+        byte[] rowKey = mutation.getRow();
+        // need to use a scan here so we can get raw state, which Get doesn't provide.
+        Scan scan = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columnReferences));
+        scan.setStartRow(rowKey);
+        scan.setStopRow(rowKey);
+
+        // Provides a means of client indicating that newer cells should not be considered,
+        // enabling mutations to be replayed to partially rebuild the index when a write fails.
+        // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
+        //long ts = getOldestTimestamp(m.getFamilyCellMap().values());
+        long ts = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+        scan.setTimeRange(0,ts);
+
+        try (RegionScanner regionScanner = region.getScanner(scan)) {
+            List<Cell> cells = new ArrayList<Cell>(1);
+            boolean more = regionScanner.next(cells);
+            assert !more : "Got more than one result when scanning"
+                + " a single row in the primary table!";
+
+            return cells;
+         }
+    }
+
+    @VisibleForTesting
+    public static CachedLocalTable build(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
+        return new CachedLocalTable(rowKeyPtrToCells, null);
+    }
+
+    public static CachedLocalTable build(
+            Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
+            final PhoenixIndexMetaData indexMetaData,
+            Region region) throws IOException {
+        if(indexMetaData.getReplayWrite() != null)
+        {
+            return new CachedLocalTable(new HashMap<ImmutableBytesPtr,List<Cell>>(), region);
+        }
+        return preScanAllRequiredRows(dataTableMutationsWithSameRowKeyAndTimestamp, indexMetaData, region);
+    }
+
+    /**
+     * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp
+     * parameter.
+     * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp
+     * parameter, all cells in the mutation have the same rowKey and timestamp.
+     * @param dataTableMutationsWithSameRowKeyAndTimestamp
+     * @param indexMetaData
+     * @param region
+     * @throws IOException
+     */
+    public static CachedLocalTable preScanAllRequiredRows(
+            Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
+            PhoenixIndexMetaData indexMetaData,
+            Region region) throws IOException {
+        List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers();
+        Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size());
+        for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow()));
+        }
+
+        Set<ColumnReference> getterColumnReferences = Sets.newHashSet();
+        for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) {
+            getterColumnReferences.addAll(
+                    indexTableMaintainer.getAllColumns());
+        }
+
+        getterColumnReferences.add(new ColumnReference(
+                indexTableMaintainers.get(0).getDataEmptyKeyValueCF(),
+                indexTableMaintainers.get(0).getEmptyKeyValueQualifier()));
+
+        Scan scan = IndexManagementUtil.newLocalStateScan(
+                Collections.singletonList(getterColumnReferences));
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+
+        if(indexMetaData.getReplayWrite() != null) {
+            /**
+             * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
+             * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
+             * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation
+             * have the same rowKey and timestamp.
+             */
+            long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
+            scan.setTimeRange(0, timestamp);
+            scan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        } else {
+            assert scan.isRaw();
+            scan.setMaxVersions(1);
+            scan.setFilter(skipScanFilter);
+        }
+
+        HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+                new HashMap<ImmutableBytesPtr, List<Cell>>();
+        try (RegionScanner scanner = region.getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                Cell cell = cells.get(0);
+                byte[] rowKey = CellUtil.cloneRow(cell);
+                rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells);
+            }
+        }
+
+        return new CachedLocalTable(rowKeyPtrToCells, region);
+    }
+
+    private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) {
+        long maxTimestamp = Long.MIN_VALUE;
+        for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+            /**
+             * all the cells in this mutation have the same timestamp.
+             */
+            long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+            if(timestamp > maxTimestamp) {
+                maxTimestamp = timestamp;
+            }
+        }
+        return maxTimestamp;
+    }
+
+    private static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) {
+        return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 5b06910..aae0763 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -43,7 +44,7 @@
    *         {@link Result} with no stored {@link Cell}s.
    * @throws IOException if there is an issue reading the row
    */
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
+  public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
       throws IOException;
 
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
deleted file mode 100644
index 402620f..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.primitives.Longs;
-
-/**
- * Wrapper around a lazily instantiated, local HTable.
- * <p>
- * Previously, we had used various row and batch caches. However, this ends up being very
- * complicated when attempting manage updating and invalidating the cache with no real gain as any
- * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
- * we are doing here. In the end, its simpler and about as efficient to just get the current state
- * of the row from HBase and let HBase manage caching the row from disk on its own.
- */
-public class LocalTable implements LocalHBaseState {
-
-  private RegionCoprocessorEnvironment env;
-
-  public LocalTable(RegionCoprocessorEnvironment env) {
-    this.env = env;
-  }
-
-  @Override
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
-      throws IOException {
-    byte[] row = m.getRow();
-    // need to use a scan here so we can get raw state, which Get doesn't provide.
-    Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
-    s.setStartRow(row);
-    s.setStopRow(row);
-    if (ignoreNewerMutations) {
-        // Provides a means of client indicating that newer cells should not be considered,
-        // enabling mutations to be replayed to partially rebuild the index when a write fails.
-        // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
-        long ts = getOldestTimestamp(m.getFamilyCellMap().values());
-        s.setTimeRange(0,ts);
-    }
-    Region region = this.env.getRegion();
-    try (RegionScanner scanner = region.getScanner(s)) {
-      List<Cell> kvs = new ArrayList<Cell>(1);
-      boolean more = scanner.next(kvs);
-      assert !more : "Got more than one result when scanning"
-          + " a single row in the primary table!";
-
-      Result r = Result.create(kvs);
-      return r;
-    }
-  }
-
-    // Returns the smallest timestamp in the given cell lists.
-    // It is assumed that the lists have cells ordered from largest to smallest timestamp
-    protected long getOldestTimestamp(Collection<List<Cell>> cellLists) {
-        Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() {
-            @Override
-            public int compare(List<Cell> left, List<Cell> right) {
-                // compare the last element of each list, since that is the smallest in that list
-                return Longs.compare(Iterables.getLast(left).getTimestamp(),
-                    Iterables.getLast(right).getTimestamp());
-            }
-        };
-        List<Cell> minList = cellListOrdering.min(cellLists);
-        return Iterables.getLast(minList).getTimestamp();
-    }
-}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index f07caf7..5a2b467 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -20,36 +20,32 @@
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.ScanUtil;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-/**
- *
- */
+
 public class LocalTableStateTest {
 
   private static final byte[] row = Bytes.toBytes("row");
@@ -88,23 +84,16 @@
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
-        KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
-        kv.setSequenceId(0);
-        list.add(kv);
-        return false;
-      }
-    });
 
 
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+    kv.setSequenceId(0);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv));
+    CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
     //add the kvs from the mutation
     table.addPendingUpdates(m.get(fam, qual));
 
@@ -122,48 +111,6 @@
           super(msg);
       }
   }
-  
-  @Test(expected = ScannerCreatedException.class)
-  public void testScannerForMutableRows() throws Exception {
-      IndexMetaData indexMetaData = new IndexMetaData() {
-
-          @Override
-          public ReplayWrite getReplayWrite() {
-              return null;
-          }
-
-        @Override
-        public boolean requiresPriorRowState(Mutation m) {
-            return true;
-        }
-            
-        @Override
-        public int getClientVersion() {
-            return ScanUtil.UNKNOWN_CLIENT_VERSION;
-        }
-
-    };
-    Put m = new Put(row);
-    m.addColumn(fam, qual, ts, val);
-    // setup mocks
-    Configuration conf = new Configuration(false);
-    RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
-    Mockito.when(env.getConfiguration()).thenReturn(conf);
-
-    Region region = Mockito.mock(Region.class);
-    Mockito.when(env.getRegion()).thenReturn(region);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
-
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
-    //add the kvs from the mutation
-    table.addPendingUpdates(m.get(fam, qual));
-
-    // setup the lookup
-    ColumnReference col = new ColumnReference(fam, qual);
-    table.setCurrentTimestamp(ts);
-    table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
-  }
 
   @Test
   public void testNoScannerForImmutableRows() throws Exception {
@@ -194,10 +141,9 @@
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
 
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    CachedLocalTable cachedLocalTable = CachedLocalTable.build(null);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
     //add the kvs from the mutation
     table.addPendingUpdates(m.get(fam, qual));
 
@@ -224,24 +170,18 @@
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
     final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
     storedKv.setSequenceId(2);
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-        list.add(storedKv);
-        return false;
-      }
-    });
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+    CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
+
     // add the kvs from the mutation
-    KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(m.get(fam, qual).get(0));
+    KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
     kv.setSequenceId(0);
     table.addPendingUpdates(kv);
 
@@ -258,8 +198,6 @@
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
-    Mockito.verify(env, Mockito.times(1)).getRegion();
-    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   @SuppressWarnings("unchecked")
@@ -270,24 +208,19 @@
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final KeyValue storedKv =
         new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value"));
     storedKv.setSequenceId(2);
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-        list.add(storedKv);
-        return false;
-      }
-    });
-    LocalHBaseState state = new LocalTable(env);
+
     Put pendingUpdate = new Put(row);
     pendingUpdate.addColumn(fam, qual, ts, val);
-    LocalTableState table = new LocalTableState(state, pendingUpdate);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+    CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate);
+
 
     // do the lookup for the given column
     ColumnReference col = new ColumnReference(fam, qual);
@@ -303,8 +236,6 @@
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Lost already loaded update!", storedKv, s.next());
-    Mockito.verify(env, Mockito.times(1)).getRegion();
-    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   // TODO add test here for making sure multiple column references with the same column family don't
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index f9c6798..dd67f90 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -51,7 +51,7 @@
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -149,6 +149,7 @@
 
         mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
         Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
+        Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
         Mockito.when(mockIndexMetaData.getIndexMaintainers())
                 .thenReturn(Collections.singletonList(getTestIndexMaintainer()));
 
@@ -212,8 +213,13 @@
         MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
         mutation.addAll(put);
 
+        CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+                Collections.singletonList(mutation),
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
         Collection<Pair<Mutation, byte[]>> indexUpdates =
-                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
+                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable);
         assertEquals(2, indexUpdates.size());
         assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
             new byte[0] /* qual not needed */, 2);
@@ -254,8 +260,16 @@
         mutation.addAll(put);
 
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
-        for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+        Collection<? extends Mutation> mutations =
+                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+        CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+                mutations,
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
+        for (Mutation m : mutations) {
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
         }
         // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
         // rows for VALUE_2, VALUE_3)
@@ -287,9 +301,17 @@
         MultiMutation mutation = getMultipleVersionMutation(200);
         currentRowCells = mutation.getFamilyCellMap().get(FAM);
 
+        Collection<? extends Mutation> mutations =
+                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+        CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+                mutations,
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
         for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
         }
         assertNotEquals(0, indexUpdates.size());
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index 571ed85..8ed61e8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -134,9 +134,9 @@
     }
 
     @Override
-    public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
+    public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
         throws IOException {
-      return r;
+      return r.listCells();
     }
 
   }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
deleted file mode 100644
index b11ac8d..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-public class TestLocalTable {
-    private static final byte[] ROW = Bytes.toBytes("test_row");
-
-    @Test
-    public void testGetOldestTimestamp() {
-        LocalTable localTable = new LocalTable(null);
-
-        List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L));
-        assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1)));
-
-        List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L));
-        List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2));
-        assertEquals(2L, localTable.getOldestTimestamp(set1));
-
-        List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L));
-        set1.add(cellList3);
-        assertEquals(1L, localTable.getOldestTimestamp(set1));
-
-        List<Cell> cellList4 =
-                getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L));
-        set1.add(cellList4);
-        assertEquals(0L, localTable.getOldestTimestamp(set1));
-    }
-
-    private List<Cell> getCellList(KeyValue... kvs) {
-        List<Cell> cellList = new ArrayList<>();
-        for (KeyValue kv : kvs) {
-            cellList.add(kv);
-        }
-        return cellList;
-    }
-}