HBASE-16225 Refactor ScanQueryMatcher
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
index fed20c4..2efcde1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
@@ -184,12 +184,11 @@
   }
 
   /**
-   * Check if the specified timestamp is within this TimeRange.
+   * Check if the specified timestamp is within or after this TimeRange.
    * <p>
-   * Returns true if within interval [minStamp, maxStamp), false
-   * if not.
+   * Returns true if greater than minStamp, false if not.
    * @param timestamp timestamp to check
-   * @return true if within TimeRange, false if not
+   * @return true if within or after TimeRange, false if not
    */
   public boolean withinOrAfterTimeRange(long timestamp) {
     assert timestamp >= 0;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index bff727a..748268e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -45,7 +45,6 @@
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -57,6 +56,7 @@
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Pair;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 4729954..4c94644 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -45,7 +45,6 @@
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -57,6 +56,7 @@
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b32b757..c4bd849 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -86,6 +86,7 @@
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
@@ -1749,35 +1750,6 @@
     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
-  /**
-   * @param cell
-   * @param oldestTimestamp
-   * @return true if the cell is expired
-   */
-  static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
-    // Look for a TTL tag first. Use it instead of the family setting if
-    // found. If a cell has multiple TTLs, resolve the conflict by using the
-    // first tag encountered.
-    Iterator<Tag> i = CellUtil.tagsIterator(cell);
-    while (i.hasNext()) {
-      Tag t = i.next();
-      if (TagType.TTL_TAG_TYPE == t.getType()) {
-        // Unlike in schema cell TTLs are stored in milliseconds, no need
-        // to convert
-        long ts = cell.getTimestamp();
-        assert t.getValueLength() == Bytes.SIZEOF_LONG;
-        long ttl = TagUtil.getValueAsLong(t);
-        if (ts + ttl < now) {
-          return true;
-        }
-        // Per cell TTLs cannot extend lifetime beyond family settings, so
-        // fall through to check that
-        break;
-      }
-    }
-    return false;
-  }
-
   @Override
   public boolean canSplit() {
     this.lock.readLock().lock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index acfaa96..567664e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -77,6 +77,7 @@
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.wal.WALKey;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 42f7369..349e166 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -122,11 +122,11 @@
     return this.parallelSeekEnabled;
   }
 
-  byte[] getFamily() {
+  public byte[] getFamily() {
     return family;
   }
 
-  int getMinVersions() {
+  public int getMinVersions() {
     return minVersions;
   }
 
@@ -138,7 +138,7 @@
     return ttl;
   }
 
-  KeepDeletedCells getKeepDeletedCells() {
+  public KeepDeletedCells getKeepDeletedCells() {
     return keepDeletedCells;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
deleted file mode 100644
index d2d0ccb..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ /dev/null
@@ -1,699 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.NavigableSet;
-
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeepDeletedCells;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A query matcher that is specifically designed for the scan case.
- */
-@InterfaceAudience.Private
-public class ScanQueryMatcher {
-  // Optimization so we can skip lots of compares when we decide to skip
-  // to the next row.
-  private boolean stickyNextRow;
-  private final byte[] stopRow;
-
-  private final TimeRange tr;
-
-  private final Filter filter;
-
-  /** Keeps track of deletes */
-  private final DeleteTracker deletes;
-
-  /*
-   * The following three booleans define how we deal with deletes.
-   * There are three different aspects:
-   * 1. Whether to keep delete markers. This is used in compactions.
-   *    Minor compactions always keep delete markers.
-   * 2. Whether to keep deleted rows. This is also used in compactions,
-   *    if the store is set to keep deleted rows. This implies keeping
-   *    the delete markers as well.
-   *    In this case deleted rows are subject to the normal max version
-   *    and TTL/min version rules just like "normal" rows.
-   * 3. Whether a scan can do time travel queries even before deleted
-   *    marker to reach deleted rows.
-   */
-  /** whether to retain delete markers */
-  private boolean retainDeletesInOutput;
-
-  /** whether to return deleted rows */
-  private final KeepDeletedCells keepDeletedCells;
-  /** whether time range queries can see rows "behind" a delete */
-  private final boolean seePastDeleteMarkers;
-
-
-  /** Keeps track of columns and versions */
-  private final ColumnTracker columns;
-
-  /** Key to seek to in memstore and StoreFiles */
-  private final Cell startKey;
-
-  /** Row comparator for the region this query is for */
-  private final CellComparator rowComparator;
-
-  /* row is not private for tests */
-  /** Row the query is on */
-  Cell curCell;
-
-  /**
-   * Oldest put in any of the involved store files
-   * Used to decide whether it is ok to delete
-   * family delete marker of this store keeps
-   * deleted KVs.
-   */
-  private final long earliestPutTs;
-  private final long ttl;
-
-  /** The oldest timestamp we are interested in, based on TTL */
-  private final long oldestUnexpiredTS;
-  private final long now;
-
-  /** readPoint over which the KVs are unconditionally included */
-  protected long maxReadPointToTrackVersions;
-
-  private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
-
-  /**
-   * This variable shows whether there is an null column in the query. There
-   * always exists a null column in the wildcard column query.
-   * There maybe exists a null column in the explicit column query based on the
-   * first column.
-   * */
-  private boolean hasNullColumn = true;
-
-  private RegionCoprocessorHost regionCoprocessorHost= null;
-
-  // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
-  // marker is always removed during a major compaction. If set to non-zero
-  // value then major compaction will try to keep a delete marker around for
-  // the given number of milliseconds. We want to keep the delete markers
-  // around a bit longer because old puts might appear out-of-order. For
-  // example, during log replication between two clusters.
-  //
-  // If the delete marker has lived longer than its column-family's TTL then
-  // the delete marker will be removed even if time.to.purge.deletes has not
-  // passed. This is because all the Puts that this delete marker can influence
-  // would have also expired. (Removing of delete markers on col family TTL will
-  // not happen if min-versions is set to non-zero)
-  //
-  // But, if time.to.purge.deletes has not expired then a delete
-  // marker will not be removed just because there are no Puts that it is
-  // currently influencing. This is because Puts, that this delete can
-  // influence.  may appear out of order.
-  private final long timeToPurgeDeletes;
-
-  private final boolean isUserScan;
-
-  private final boolean isReversed;
-
-  /**
-   * True if we are doing a 'Get' Scan. Every Get is actually a one-row Scan.
-   */
-  private final boolean get;
-
-  /**
-   * Construct a QueryMatcher for a scan
-   * @param scanInfo The store's immutable scan info
-   * @param scanType Type of the scan
-   * @param earliestPutTs Earliest put seen in any of the store files.
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
-   */
-  public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
-      ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
-      long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
-    if (timeRange == null) {
-      this.tr = scan.getTimeRange();
-    } else {
-      this.tr = timeRange;
-    }
-    this.get = scan.isGetScan();
-    this.rowComparator = scanInfo.getComparator();
-    this.regionCoprocessorHost = regionCoprocessorHost;
-    this.deletes =  instantiateDeleteTracker();
-    this.stopRow = scan.getStopRow();
-    this.startKey = CellUtil.createFirstDeleteFamilyCellOnRow(scan.getStartRow(),
-        scanInfo.getFamily());
-    this.filter = scan.getFilter();
-    this.earliestPutTs = earliestPutTs;
-    this.oldestUnexpiredTS = oldestUnexpiredTS;
-    this.now = now;
-
-    this.maxReadPointToTrackVersions = readPointToUse;
-    this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
-    this.ttl = oldestUnexpiredTS;
-
-    /* how to deal with deletes */
-    this.isUserScan = scanType == ScanType.USER_SCAN;
-    // keep deleted cells: if compaction or raw scan
-    this.keepDeletedCells = scan.isRaw() ? KeepDeletedCells.TRUE :
-      isUserScan ? KeepDeletedCells.FALSE : scanInfo.getKeepDeletedCells();
-    // retain deletes: if minor compaction or raw scanisDone
-    this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES || scan.isRaw();
-    // seePastDeleteMarker: user initiated scans
-    this.seePastDeleteMarkers =
-        scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE && isUserScan;
-
-    int maxVersions =
-        scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
-          scanInfo.getMaxVersions());
-
-    // Single branch to deal with two types of reads (columns vs all in family)
-    if (columns == null || columns.size() == 0) {
-      // there is always a null column in the wildcard column query.
-      hasNullColumn = true;
-
-      // use a specialized scan for wildcard column tracker.
-      this.columns = new ScanWildcardColumnTracker(
-          scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
-    } else {
-      // whether there is null column in the explicit column query
-      hasNullColumn = (columns.first().length == 0);
-
-      // We can share the ExplicitColumnTracker, diff is we reset
-      // between rows, not between storefiles.
-      this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
-          oldestUnexpiredTS);
-    }
-    this.isReversed = scan.isReversed();
-  }
-
-  private DeleteTracker instantiateDeleteTracker() throws IOException {
-    DeleteTracker tracker = new ScanDeleteTracker();
-    if (regionCoprocessorHost != null) {
-      tracker = regionCoprocessorHost.postInstantiateDeleteTracker(tracker);
-    }
-    return tracker;
-  }
-
-  /**
-   * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows.
-   * @param scan
-   * @param scanInfo The store's immutable scan info
-   * @param columns
-   * @param earliestPutTs Earliest put seen in any of the store files.
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
-   * @param now the current server time
-   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
-   * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
-   * @param regionCoprocessorHost
-   * @throws IOException
-   */
-  public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now,
-      byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
-        oldestUnexpiredTS, now, regionCoprocessorHost);
-    Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
-    this.dropDeletesFromRow = dropDeletesFromRow;
-    this.dropDeletesToRow = dropDeletesToRow;
-  }
-
-  /*
-   * Constructor for tests
-   */
-  ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
-      NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
-    this(scan, scanInfo, columns, ScanType.USER_SCAN,
-          Long.MAX_VALUE, /* max Readpoint to track versions */
-        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
-  }
-
-  /**
-   *
-   * @return  whether there is an null column in the query
-   */
-  public boolean hasNullColumnInQuery() {
-    return hasNullColumn;
-  }
-
-  /**
-   * Determines if the caller should do one of several things:
-   * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW)
-   * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL)
-   * - include the current KeyValue (MatchCode.INCLUDE)
-   * - ignore the current KeyValue (MatchCode.SKIP)
-   * - got to the next row (MatchCode.DONE)
-   *
-   * @param cell KeyValue to check
-   * @return The match code instance.
-   * @throws IOException in case there is an internal consistency problem
-   *      caused by a data corruption.
-   */
-  public MatchCode match(Cell cell) throws IOException {
-    if (filter != null && filter.filterAllRemaining()) {
-      return MatchCode.DONE_SCAN;
-    }
-    if (curCell != null) {
-      int ret = this.rowComparator.compareRows(curCell, cell);
-      if (!this.isReversed) {
-        if (ret <= -1) {
-          return MatchCode.DONE;
-        } else if (ret >= 1) {
-          // could optimize this, if necessary?
-          // Could also be called SEEK_TO_CURRENT_ROW, but this
-          // should be rare/never happens.
-          return MatchCode.SEEK_NEXT_ROW;
-        }
-      } else {
-        if (ret <= -1) {
-          return MatchCode.SEEK_NEXT_ROW;
-        } else if (ret >= 1) {
-          return MatchCode.DONE;
-        }
-      }
-    } else {
-      // Since the curCell is null it means we are already sure that we have moved over to the next row
-      return MatchCode.DONE;
-    }
-
-    // optimize case.
-    if (this.stickyNextRow) {
-      return MatchCode.SEEK_NEXT_ROW;
-    }
-
-    if (this.columns.done()) {
-      stickyNextRow = true;
-      return MatchCode.SEEK_NEXT_ROW;
-    }
-
-    long timestamp = cell.getTimestamp();
-    // check for early out based on timestamp alone
-    if (columns.isDone(timestamp)) {
-      return columns.getNextRowOrNextColumn(cell);
-    }
-    // check if the cell is expired by cell TTL
-    if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
-      return MatchCode.SKIP;
-    }
-
-    /*
-     * The delete logic is pretty complicated now.
-     * This is corroborated by the following:
-     * 1. The store might be instructed to keep deleted rows around.
-     * 2. A scan can optionally see past a delete marker now.
-     * 3. If deleted rows are kept, we have to find out when we can
-     *    remove the delete markers.
-     * 4. Family delete markers are always first (regardless of their TS)
-     * 5. Delete markers should not be counted as version
-     * 6. Delete markers affect puts of the *same* TS
-     * 7. Delete marker need to be version counted together with puts
-     *    they affect
-     */
-    byte typeByte = cell.getTypeByte();
-    long mvccVersion = cell.getSequenceId();
-    if (CellUtil.isDelete(cell)) {
-      if (keepDeletedCells == KeepDeletedCells.FALSE
-          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < ttl)) {
-        // first ignore delete markers if the scanner can do so, and the
-        // range does not include the marker
-        //
-        // during flushes and compactions also ignore delete markers newer
-        // than the readpoint of any open scanner, this prevents deleted
-        // rows that could still be seen by a scanner from being collected
-        boolean includeDeleteMarker = seePastDeleteMarkers ?
-            tr.withinTimeRange(timestamp) :
-            tr.withinOrAfterTimeRange(timestamp);
-        if (includeDeleteMarker
-            && mvccVersion <= maxReadPointToTrackVersions) {
-          this.deletes.add(cell);
-        }
-        // Can't early out now, because DelFam come before any other keys
-      }
-
-      if ((!isUserScan)
-          && timeToPurgeDeletes > 0
-          && (EnvironmentEdgeManager.currentTime() - timestamp)
-            <= timeToPurgeDeletes) {
-        return MatchCode.INCLUDE;
-      } else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) {
-        // always include or it is not time yet to check whether it is OK
-        // to purge deltes or not
-        if (!isUserScan) {
-          // if this is not a user scan (compaction), we can filter this deletemarker right here
-          // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
-          return MatchCode.INCLUDE;
-        }
-      } else if (keepDeletedCells == KeepDeletedCells.TRUE
-          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= ttl)) {
-        if (timestamp < earliestPutTs) {
-          // keeping delete rows, but there are no puts older than
-          // this delete in the store files.
-          return columns.getNextRowOrNextColumn(cell);
-        }
-        // else: fall through and do version counting on the
-        // delete markers
-      } else {
-        return MatchCode.SKIP;
-      }
-      // note the following next else if...
-      // delete marker are not subject to other delete markers
-    } else if (!this.deletes.isEmpty()) {
-      DeleteResult deleteResult = deletes.isDeleted(cell);
-      switch (deleteResult) {
-        case FAMILY_DELETED:
-        case COLUMN_DELETED:
-          return columns.getNextRowOrNextColumn(cell);
-        case VERSION_DELETED:
-        case FAMILY_VERSION_DELETED:
-          return MatchCode.SKIP;
-        case NOT_DELETED:
-          break;
-        default:
-          throw new RuntimeException("UNEXPECTED");
-        }
-    }
-
-    // NOTE: Cryptic stuff!
-    // if the timestamp is HConstants.OLDEST_TIMESTAMP, then this is a fake cell made to prime a
-    // Scanner; See KeyValueUTil#createLastOnRow. This Cell should never end up returning out of
-    // here a matchcode of INCLUDE else we will return to the client a fake Cell. If we call
-    // TimeRange, it will return 0 because it doesn't deal in OLDEST_TIMESTAMP and we will fall
-    // into the later code where we could return a matchcode of INCLUDE. See HBASE-16074 "ITBLL
-    // fails, reports lost big or tiny families" for a horror story. Check here for
-    // OLDEST_TIMESTAMP. TimeRange#compare is about more generic timestamps, between 0L and
-    // Long.MAX_LONG. It doesn't do OLDEST_TIMESTAMP weird handling.
-    int timestampComparison = timestamp == HConstants.OLDEST_TIMESTAMP? -1: tr.compare(timestamp);
-    if (timestampComparison >= 1) {
-      return MatchCode.SKIP;
-    } else if (timestampComparison <= -1) {
-      return columns.getNextRowOrNextColumn(cell);
-    }
-
-    // STEP 1: Check if the column is part of the requested columns
-    MatchCode colChecker = columns.checkColumn(cell, typeByte);
-    if (colChecker == MatchCode.INCLUDE) {
-      ReturnCode filterResponse = ReturnCode.SKIP;
-      // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
-      if (filter != null) {
-        // STEP 3: Filter the key value and return if it filters out
-        filterResponse = filter.filterKeyValue(cell);
-        switch (filterResponse) {
-        case SKIP:
-          return MatchCode.SKIP;
-        case NEXT_COL:
-          return columns.getNextRowOrNextColumn(cell);
-        case NEXT_ROW:
-          stickyNextRow = true;
-          return MatchCode.SEEK_NEXT_ROW;
-        case SEEK_NEXT_USING_HINT:
-          return MatchCode.SEEK_NEXT_USING_HINT;
-        default:
-          //It means it is either include or include and seek next
-          break;
-        }
-      }
-      /*
-       * STEP 4: Reaching this step means the column is part of the requested columns and either
-       * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
-       * Now check the number of versions needed. This method call returns SKIP, INCLUDE,
-       * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
-       *
-       * FilterResponse            ColumnChecker               Desired behavior
-       * INCLUDE                   SKIP                        row has already been included, SKIP.
-       * INCLUDE                   INCLUDE                     INCLUDE
-       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
-       * INCLUDE_AND_SEEK_NEXT_COL SKIP                        row has already been included, SKIP.
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE                     INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
-       *
-       * In all the above scenarios, we return the column checker return value except for
-       * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
-       */
-      colChecker = columns.checkVersions(cell, timestamp, typeByte,
-          mvccVersion > maxReadPointToTrackVersions);
-      //Optimize with stickyNextRow
-      boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW &&
-          filter.isFamilyEssential(cell.getFamilyArray());
-      if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
-        stickyNextRow = true;
-      }
-      if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
-        if (colChecker != MatchCode.SKIP) {
-          return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
-        }
-        return MatchCode.SEEK_NEXT_ROW;
-      }
-      return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
-          colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
-          : colChecker;
-    }
-    stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true
-        : stickyNextRow;
-    return colChecker;
-  }
-
-  /** Handle partial-drop-deletes. As we match keys in order, when we have a range from which
-   * we can drop deletes, we can set retainDeletesInOutput to false for the duration of this
-   * range only, and maintain consistency. */
-  private void checkPartialDropDeleteRange(Cell curCell) {
-    // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
-    // are both set, and the matcher is set to retain deletes. We assume ordered keys. When
-    // dropDeletesFromRow is leq current kv, we start dropping deletes and reset
-    // dropDeletesFromRow; thus the 2nd "if" starts to apply.
-    if ((dropDeletesFromRow != null)
-        && (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW) ||
-            (CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0,
-                dropDeletesFromRow.length) >= 0))) {
-      retainDeletesInOutput = false;
-      dropDeletesFromRow = null;
-    }
-    // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial-
-    // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
-    // and reset dropDeletesToRow so that we don't do any more compares.
-    if ((dropDeletesFromRow == null)
-        && (dropDeletesToRow != null)
-        && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW)
-        && (CellComparator.COMPARATOR
-            .compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
-      retainDeletesInOutput = true;
-      dropDeletesToRow = null;
-    }
-  }
-
-  /**
-   * @return Returns false if we know there are no more rows to be scanned (We've reached the
-   * <code>stopRow</code> or we are scanning on row only because this Scan is for a Get, etc.
-   */
-  public boolean moreRowsMayExistAfter(Cell kv) {
-    // If a 'get' Scan -- we are doing a Get (every Get is a single-row Scan in implementation) --
-    // then we are looking at one row only, the one specified in the Get coordinate..so we know
-    // for sure that there are no more rows on this Scan
-    if (this.get) {
-      return false;
-    }
-    // If no stopRow, return that there may be more rows. The tests that follow depend on a
-    // non-empty, non-default stopRow so this little test below short-circuits out doing the
-    // following compares.
-    if (this.stopRow == null || this.stopRow == HConstants.EMPTY_BYTE_ARRAY) {
-      return true;
-    }
-    return this.isReversed?
-      rowComparator.compareRows(kv, stopRow, 0, stopRow.length) > 0:
-      Bytes.equals(stopRow, HConstants.EMPTY_END_ROW) ||
-        rowComparator.compareRows(kv, stopRow, 0, stopRow.length) < 0;
-  }
-
-  /**
-   * Set the row when there is change in row
-   * @param curCell
-   */
-  public void setToNewRow(Cell curCell) {
-    checkPartialDropDeleteRange(curCell);
-    this.curCell = curCell;
-    reset();
-  }
-
-  public void reset() {
-    this.deletes.reset();
-    this.columns.reset();
-
-    stickyNextRow = false;
-  }
-
-  /**
-   *
-   * @return the start key
-   */
-  public Cell getStartKey() {
-    return this.startKey;
-  }
-
-  /**
-   *
-   * @return the Filter
-   */
-  Filter getFilter() {
-    return this.filter;
-  }
-
-  public Cell getNextKeyHint(Cell kv) throws IOException {
-    if (filter == null) {
-      return null;
-    } else {
-      return filter.getNextCellHint(kv);
-    }
-  }
-
-  public Cell getKeyForNextColumn(Cell kv) {
-    ColumnCount nextColumn = columns.getColumnHint();
-    if (nextColumn == null) {
-      return CellUtil.createLastOnRowCol(kv);
-    } else {
-      return CellUtil.createFirstOnRowCol(kv, nextColumn.getBuffer(), nextColumn.getOffset(),
-          nextColumn.getLength());
-    }
-  }
-
-  public Cell getKeyForNextRow(Cell c) {
-    return CellUtil.createLastOnRow(c);
-  }
-
-  /**
-   * @param nextIndexed the key of the next entry in the block index (if any)
-   * @param kv The Cell we're using to calculate the seek key
-   * @return result of the compare between the indexed key and the key portion of the passed cell
-   */
-  public int compareKeyForNextRow(Cell nextIndexed, Cell kv) {
-    return rowComparator.compareKeyBasedOnColHint(nextIndexed, kv, 0, 0, null, 0, 0,
-        HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode());
-  }
-
-  /**
-   * @param nextIndexed the key of the next entry in the block index (if any)
-   * @param currentCell The Cell we're using to calculate the seek key
-   * @return result of the compare between the indexed key and the key portion of the passed cell
-   */
-  public int compareKeyForNextColumn(Cell nextIndexed, Cell currentCell) {
-    ColumnCount nextColumn = columns.getColumnHint();
-    if (nextColumn == null) {
-      return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell, 0, 0, null, 0, 0,
-          HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode());
-    } else {
-      return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell,
-          currentCell.getFamilyOffset(), currentCell.getFamilyLength(), nextColumn.getBuffer(),
-          nextColumn.getOffset(), nextColumn.getLength(), HConstants.LATEST_TIMESTAMP,
-          Type.Maximum.getCode());
-    }
-  }
-
-  boolean isUserScan() {
-    return this.isUserScan;
-  }
-
-  //Used only for testing purposes
-  static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset,
-      int length, long ttl, byte type, boolean ignoreCount) throws IOException {
-    KeyValue kv = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY, 0, 0,
-        HConstants.EMPTY_BYTE_ARRAY, 0, 0, bytes, offset, length);
-    MatchCode matchCode = columnTracker.checkColumn(kv, type);
-    if (matchCode == MatchCode.INCLUDE) {
-      return columnTracker.checkVersions(kv, ttl, type, ignoreCount);
-    }
-    return matchCode;
-  }
-
-  /**
-   * {@link #match} return codes.  These instruct the scanner moving through
-   * memstores and StoreFiles what to do with the current KeyValue.
-   * <p>
-   * Additionally, this contains "early-out" language to tell the scanner to
-   * move on to the next File (memstore or Storefile), or to return immediately.
-   */
-  public static enum MatchCode {
-    /**
-     * Include KeyValue in the returned result
-     */
-    INCLUDE,
-
-    /**
-     * Do not include KeyValue in the returned result
-     */
-    SKIP,
-
-    /**
-     * Do not include, jump to next StoreFile or memstore (in time order)
-     */
-    NEXT,
-
-    /**
-     * Do not include, return current result
-     */
-    DONE,
-
-    /**
-     * These codes are used by the ScanQueryMatcher
-     */
-
-    /**
-     * Done with the row, seek there.
-     */
-    SEEK_NEXT_ROW,
-    /**
-     * Done with column, seek to next.
-     */
-    SEEK_NEXT_COL,
-
-    /**
-     * Done with scan, thanks to the row filter.
-     */
-    DONE_SCAN,
-
-    /*
-     * Seek to next key which is given as hint.
-     */
-    SEEK_NEXT_USING_HINT,
-
-    /**
-     * Include KeyValue and done with column, seek to next.
-     */
-    INCLUDE_AND_SEEK_NEXT_COL,
-
-    /**
-     * Include KeyValue and done with row, seek to next.
-     */
-    INCLUDE_AND_SEEK_NEXT_ROW,
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 5dec59a..853a4cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 1582237..dca7388 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,12 +49,6 @@
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Reader for a StoreFile.
  */
@@ -114,46 +114,22 @@
   }
 
   /**
-   * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
-   * {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0.
-   * Do not use this overload if using this scanner for compactions.
-   *
-   * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
-   */
-  public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
-    // 0 is passed as readpoint because this method is only used by test
-    // where StoreFile is directly operated upon
-    return getStoreFileScanner(cacheBlocks, pread, false, 0, 0);
-  }
-
-  /**
-   * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
-   * {@code scannerOrder} to 0.
-   *
-   * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
-   */
-  public StoreFileScanner getStoreFileScanner(
-      boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) {
-    return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0);
-  }
-
-  /**
    * Get a scanner to scan over this StoreFile.
-   *
    * @param cacheBlocks should this scanner cache blocks?
    * @param pread use pread (for highly concurrent small readers)
    * @param isCompaction is scanner being used for compaction?
    * @param scannerOrder Order of this scanner relative to other scanners. See
-   *  {@link KeyValueScanner#getScannerOrder()}.
+   *          {@link KeyValueScanner#getScannerOrder()}.
+   * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column,
+   *          otherwise {@code false}. This is a hint for optimization.
    * @return a scanner
    */
-  public StoreFileScanner getStoreFileScanner(
-      boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) {
+  public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread,
+      boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
     // Increment the ref count
     refCount.incrementAndGet();
-    return new StoreFileScanner(
-        this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(),
-        readPt, scannerOrder);
+    return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction,
+        reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 4955ffe..ac55d8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.util.Counter;
 
 /**
@@ -54,49 +55,41 @@
   private boolean delayedReseek;
   private Cell delayedSeekKV;
 
-  private boolean enforceMVCC = false;
-  private boolean hasMVCCInfo = false;
+  private final boolean enforceMVCC;
+  private final boolean hasMVCCInfo;
   // A flag represents whether could stop skipping KeyValues for MVCC
   // if have encountered the next row. Only used for reversed scan
   private boolean stopSkippingKVsIfNextRow = false;
 
   private static Counter seekCount;
 
-  private ScanQueryMatcher matcher;
+  private final boolean canOptimizeForNonNullColumn;
 
-  private long readPt;
+  private final long readPt;
 
   // Order of this scanner relative to other scanners when duplicate key-value is found.
   // Higher values means scanner has newer data.
-  private long scannerOrder;
+  private final long scannerOrder;
 
   /**
    * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
    * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
    * @param readPt MVCC value to use to filter out the updates newer than this scanner.
    * @param hasMVCC Set to true if underlying store file reader has MVCC info.
+   * @param scannerOrder Order of the scanner relative to other scanners. See
+   *          {@link KeyValueScanner#getScannerOrder()}.
+   * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column,
+   *          otherwise {@code false}. This is a hint for optimization.
    */
   public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
-      boolean hasMVCC, long readPt) {
-    this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
-  }
-
-  /**
-   * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
-   * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
-   * @param readPt MVCC value to use to filter out the updates newer than this scanner.
-   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
-   * @param scannerOrder Order of the scanner relative to other scanners.
-   *   See {@link KeyValueScanner#getScannerOrder()}.
-   */
-  public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
-      boolean hasMVCC, long readPt, long scannerOrder) {
+      boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
     this.readPt = readPt;
     this.reader = reader;
     this.hfs = hfs;
     this.enforceMVCC = useMVCC;
     this.hasMVCCInfo = hasMVCC;
     this.scannerOrder = scannerOrder;
+    this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn;
   }
 
   boolean isPrimaryReplica() {
@@ -126,24 +119,20 @@
   }
 
   /**
-   * Return an array of scanners corresponding to the given set of store files,
-   * And set the ScanQueryMatcher for each store file scanner for further
-   * optimization
+   * Return an array of scanners corresponding to the given set of store files, And set the
+   * ScanQueryMatcher for each store file scanner for further optimization
    */
-  public static List<StoreFileScanner> getScannersForStoreFiles(
-      Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-      boolean isCompaction, boolean canUseDrop,
+  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
+      boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
-    List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
-        files.size());
+    List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(files.size());
     List<StoreFile> sorted_files = new ArrayList<>(files);
     Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
     for (int i = 0; i < sorted_files.size(); i++) {
       StoreFileReader r = sorted_files.get(i).createReader();
       r.setReplicaStoreFile(isPrimaryReplica);
-      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
-          isCompaction, readPt, i);
-      scanner.setScanQueryMatcher(matcher);
+      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt,
+        i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
       scanners.add(scanner);
     }
     return scanners;
@@ -360,12 +349,12 @@
       // check ROWCOL Bloom filter first.
       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
         haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
-      } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
-          ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
+      } else if (canOptimizeForNonNullColumn
+          && ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
         // if there is no such delete family kv in the store file,
         // then no need to seek.
-        haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
-            kv.getRowOffset(), kv.getRowLength());
+        haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(),
+          kv.getRowLength());
       }
     }
 
@@ -434,10 +423,6 @@
     }
   }
 
-  public void setScanQueryMatcher(ScanQueryMatcher matcher) {
-    this.matcher = matcher;
-  }
-
   @Override
   public boolean isFileScanner() {
     return true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 080bb95..91a77ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -42,14 +44,16 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
+import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
+import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
  * into List&lt;KeyValue&gt; for a single row.
@@ -180,6 +184,7 @@
   protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
     this.currentScanners.addAll(scanners);
   }
+
   /**
    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
    * are not in a compaction.
@@ -196,9 +201,8 @@
     if (columns != null && scan.isRaw()) {
       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
     }
-    matcher = new ScanQueryMatcher(scan, scanInfo, columns,
-        ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
-        oldestUnexpiredTS, now, store.getCoprocessorHost());
+    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
+      store.getCoprocessorHost());
 
     this.store.addChangedReaderObserver(this);
 
@@ -267,13 +271,19 @@
       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
     this(store, scan, scanInfo, null,
-      ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
-    if (dropDeletesFromRow == null) {
-      matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
-          earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
+        ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false);
+    if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
+        || (scan.getStopRow() != null && scan.getStopRow().length > 0)
+        || !scan.getTimeRange().isAllTime()) {
+      // use legacy query matcher since we do not consider the scan object in our code. Only used to
+      // keep compatibility for coprocessor.
+      matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint,
+        earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
+        store.getCoprocessorHost());
     } else {
-      matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
-          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+      matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint,
+        earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow,
+        store.getCoprocessorHost());
     }
 
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -306,18 +316,27 @@
       0);
   }
 
-  public StoreScanner(final Scan scan, ScanInfo scanInfo,
-      ScanType scanType, final NavigableSet<byte[]> columns,
-      final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
-  throws IOException {
+  public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
+      final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs,
+      long readPt) throws IOException {
     this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
-    this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
-        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
-
-    // In unit tests, the store could be null
-    if (this.store != null) {
-      this.store.addChangedReaderObserver(this);
+    if (scanType == ScanType.USER_SCAN) {
+      this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
+        null);
+    } else {
+      if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0)
+          || (scan.getStopRow() != null && scan.getStopRow().length > 0)
+          || !scan.getTimeRange().isAllTime() || columns != null) {
+        // use legacy query matcher since we do not consider the scan object in our code. Only used
+        // to keep compatibility for coprocessor.
+        matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE,
+          earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost());
+      } else {
+        this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
+          earliestPutTs, oldestUnexpiredTS, now, null, null, null);
+      }
     }
+
     // Seek all scanners to the initial key
     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
     addCurrentScanners(scanners);
@@ -515,7 +534,7 @@
     // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
     // rows. Else it is possible we are still traversing the same row so we must perform the row
     // comparison.
-    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null) {
+    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
       this.countPerRow = 0;
       matcher.setToNewRow(cell);
     }
@@ -544,111 +563,111 @@
       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
       qcode = optimize(qcode, cell);
       switch (qcode) {
-      case INCLUDE:
-      case INCLUDE_AND_SEEK_NEXT_ROW:
-      case INCLUDE_AND_SEEK_NEXT_COL:
+        case INCLUDE:
+        case INCLUDE_AND_SEEK_NEXT_ROW:
+        case INCLUDE_AND_SEEK_NEXT_COL:
 
-        Filter f = matcher.getFilter();
-        if (f != null) {
-          cell = f.transformCell(cell);
-        }
+          Filter f = matcher.getFilter();
+          if (f != null) {
+            cell = f.transformCell(cell);
+          }
 
-        this.countPerRow++;
-        if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
-          // do what SEEK_NEXT_ROW does.
+          this.countPerRow++;
+          if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
+            // do what SEEK_NEXT_ROW does.
+            if (!matcher.moreRowsMayExistAfter(cell)) {
+              close(false);// Do all cleanup except heap.close()
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+            }
+            matcher.clearCurrentRow();
+            seekToNextRow(cell);
+            break LOOP;
+          }
+
+          // add to results only if we have skipped #storeOffset kvs
+          // also update metric accordingly
+          if (this.countPerRow > storeOffset) {
+            outResult.add(cell);
+
+            // Update local tracking information
+            count++;
+            totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
+
+            // Update the progress of the scanner context
+            scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
+            scannerContext.incrementBatchProgress(1);
+
+            if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
+              throw new RowTooBigException(
+                  "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
+            }
+          }
+
+          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+            if (!matcher.moreRowsMayExistAfter(cell)) {
+              close(false);// Do all cleanup except heap.close()
+              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+            }
+            matcher.clearCurrentRow();
+            seekToNextRow(cell);
+          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
+            seekAsDirection(matcher.getKeyForNextColumn(cell));
+          } else {
+            this.heap.next();
+          }
+
+          if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
+            break LOOP;
+          }
+          if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
+            break LOOP;
+          }
+          continue;
+
+        case DONE:
+          // Optimization for Gets! If DONE, no more to get on this row, early exit!
+          if (this.scan.isGetScan()) {
+            // Then no more to this row... exit.
+            close(false);// Do all cleanup except heap.close()
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+          }
+          matcher.clearCurrentRow();
+          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+
+        case DONE_SCAN:
+          close(false);// Do all cleanup except heap.close()
+          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+
+        case SEEK_NEXT_ROW:
+          // This is just a relatively simple end of scan fix, to short-cut end
+          // us if there is an endKey in the scan.
           if (!matcher.moreRowsMayExistAfter(cell)) {
             close(false);// Do all cleanup except heap.close()
             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
-          matcher.curCell = null;
+          matcher.clearCurrentRow();
           seekToNextRow(cell);
-          break LOOP;
-        }
+          break;
 
-        // add to results only if we have skipped #storeOffset kvs
-        // also update metric accordingly
-        if (this.countPerRow > storeOffset) {
-          outResult.add(cell);
-
-          // Update local tracking information
-          count++;
-          totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
-
-          // Update the progress of the scanner context
-          scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
-          scannerContext.incrementBatchProgress(1);
-
-          if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
-            throw new RowTooBigException(
-                "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
-          }
-        }
-
-        if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
-          if (!matcher.moreRowsMayExistAfter(cell)) {
-            close(false);// Do all cleanup except heap.close()
-            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-          }
-          matcher.curCell = null;
-          seekToNextRow(cell);
-        } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
+        case SEEK_NEXT_COL:
           seekAsDirection(matcher.getKeyForNextColumn(cell));
-        } else {
+          break;
+
+        case SKIP:
           this.heap.next();
-        }
+          break;
 
-        if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
-          break LOOP;
-        }
-        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
-          break LOOP;
-        }
-        continue;
+        case SEEK_NEXT_USING_HINT:
+          Cell nextKV = matcher.getNextKeyHint(cell);
+          if (nextKV != null) {
+            seekAsDirection(nextKV);
+          } else {
+            heap.next();
+          }
+          break;
 
-      case DONE:
-        // Optimization for Gets! If DONE, no more to get on this row, early exit!
-        if (this.scan.isGetScan()) {
-          // Then no more to this row... exit.
-          close(false);// Do all cleanup except heap.close()
-          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-        }
-        matcher.curCell = null;
-        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
-
-      case DONE_SCAN:
-        close(false);// Do all cleanup except heap.close()
-        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-
-      case SEEK_NEXT_ROW:
-        // This is just a relatively simple end of scan fix, to short-cut end
-        // us if there is an endKey in the scan.
-        if (!matcher.moreRowsMayExistAfter(cell)) {
-          close(false);// Do all cleanup except heap.close()
-          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-        }
-        matcher.curCell = null;
-        seekToNextRow(cell);
-        break;
-
-      case SEEK_NEXT_COL:
-        seekAsDirection(matcher.getKeyForNextColumn(cell));
-        break;
-
-      case SKIP:
-        this.heap.next();
-        break;
-
-      case SEEK_NEXT_USING_HINT:
-        Cell nextKV = matcher.getNextKeyHint(cell);
-        if (nextKV != null) {
-          seekAsDirection(nextKV);
-        } else {
-          heap.next();
-        }
-        break;
-
-      default:
-        throw new RuntimeException("UNEXPECTED");
+        default:
+          throw new RuntimeException("UNEXPECTED");
       }
     } while ((cell = this.heap.peek()) != null);
 
@@ -822,7 +841,7 @@
     if (cell == null) {
       cell = lastTopKey;
     }
-    if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) {
+    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
       this.countPerRow = 0;
       // The setToNewRow will call reset internally
       matcher.setToNewRow(cell);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java
similarity index 84%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java
index 71ea1bd..74b9084 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnCount.java
@@ -16,18 +16,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
- * Simple wrapper for a byte buffer and a counter.  Does not copy.
+ * Simple wrapper for a byte buffer and a counter. Does not copy.
  * <p>
  * NOT thread-safe because it is not used in a multi-threaded context, yet.
  */
 @InterfaceAudience.Private
-public class ColumnCount {
-  private final byte [] bytes;
+class ColumnCount {
+  private final byte[] bytes;
   private final int offset;
   private final int length;
   private int count;
@@ -36,7 +36,7 @@
    * Constructor
    * @param column the qualifier to count the versions for
    */
-  public ColumnCount(byte [] column) {
+  public ColumnCount(byte[] column) {
     this(column, 0);
   }
 
@@ -45,7 +45,7 @@
    * @param column the qualifier to count the versions for
    * @param count initial count
    */
-  public ColumnCount(byte [] column, int count) {
+  public ColumnCount(byte[] column, int count) {
     this(column, 0, column.length, count);
   }
 
@@ -56,7 +56,7 @@
    * @param length of the qualifier
    * @param count initial count
    */
-  public ColumnCount(byte [] column, int offset, int length, int count) {
+  public ColumnCount(byte[] column, int offset, int length, int count) {
     this.bytes = column;
     this.offset = offset;
     this.length = length;
@@ -66,21 +66,21 @@
   /**
    * @return the buffer
    */
-  public byte [] getBuffer(){
+  public byte[] getBuffer() {
     return this.bytes;
   }
 
   /**
    * @return the offset
    */
-  public int getOffset(){
+  public int getOffset() {
     return this.offset;
   }
 
   /**
    * @return the length
    */
-  public int getLength(){
+  public int getLength() {
     return this.length;
   }
 
@@ -107,5 +107,4 @@
   public void setCount(int count) {
     this.count = count;
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
similarity index 94%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
index d352561..17c6afe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
@@ -16,13 +16,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 
 /**
  * Implementing classes of this interface will be used for the tracking
@@ -43,8 +43,8 @@
  * believes that the current column should be skipped (by timestamp, filter etc.)</li>
  * </ul>
  * <p>
- * These two methods returns a 
- * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode}
+ * These two methods returns a
+ * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode}
  * to define what action should be taken.
  * <p>
  * This class is NOT thread-safe as queries are never multi-threaded
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
new file mode 100644
index 0000000..d3224dc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/CompactionScanQueryMatcher.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+
+/**
+ * Query matcher for compaction.
+ */
+@InterfaceAudience.Private
+public abstract class CompactionScanQueryMatcher extends ScanQueryMatcher {
+
+  /** readPoint over which the KVs are unconditionally included */
+  protected final long maxReadPointToTrackVersions;
+
+  /** Keeps track of deletes */
+  protected final DeleteTracker deletes;
+
+  /** whether to return deleted rows */
+  protected final KeepDeletedCells keepDeletedCells;
+
+  protected CompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      long readPointToUse, long oldestUnexpiredTS, long now) {
+    super(HConstants.EMPTY_START_ROW, scanInfo,
+        new ScanWildcardColumnTracker(scanInfo.getMinVersions(), scanInfo.getMaxVersions(),
+            oldestUnexpiredTS),
+        oldestUnexpiredTS, now);
+    this.maxReadPointToTrackVersions = readPointToUse;
+    this.deletes = deletes;
+    this.keepDeletedCells = scanInfo.getKeepDeletedCells();
+  }
+
+  @Override
+  public boolean hasNullColumnInQuery() {
+    return true;
+  }
+
+  @Override
+  public boolean isUserScan() {
+    return false;
+  }
+
+  @Override
+  public boolean moreRowsMayExistAfter(Cell cell) {
+    return true;
+  }
+
+  @Override
+  public Filter getFilter() {
+    // no filter when compaction
+    return null;
+  }
+
+  @Override
+  public Cell getNextKeyHint(Cell cell) throws IOException {
+    // no filter, so no key hint.
+    return null;
+  }
+
+  @Override
+  protected void reset() {
+    deletes.reset();
+  }
+
+  protected final void trackDelete(Cell cell) {
+    // If keepDeletedCells is true, then we only remove cells by versions or TTL during
+    // compaction, so we do not need to track delete here.
+    // If keepDeletedCells is TTL and the delete marker is expired, then we can make sure that the
+    // minVerions is larger than 0(otherwise we will just return at preCheck). So here we still
+    // need to track the delete marker to see if it masks some cells.
+    if (keepDeletedCells == KeepDeletedCells.FALSE
+        || (keepDeletedCells == KeepDeletedCells.TTL && cell.getTimestamp() < oldestUnexpiredTS)) {
+      deletes.add(cell);
+    }
+  }
+
+  public static CompactionScanQueryMatcher create(ScanInfo scanInfo, ScanType scanType,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now,
+      byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
+      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+    DeleteTracker deleteTracker = instantiateDeleteTracker(regionCoprocessorHost);
+    if (dropDeletesFromRow == null) {
+      if (scanType == ScanType.COMPACT_RETAIN_DELETES) {
+        return new MinorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
+            oldestUnexpiredTS, now);
+      } else {
+        return new MajorCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
+            earliestPutTs, oldestUnexpiredTS, now);
+      }
+    } else {
+      return new StripeCompactionScanQueryMatcher(scanInfo, deleteTracker, readPointToUse,
+          earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
similarity index 81%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
index 8f466fc..4e1ba4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DeleteTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DeleteTracker.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 
 /**
- * This interface is used for the tracking and enforcement of Deletes
- * during the course of a Get or Scan operation.
+ * This interface is used for the tracking and enforcement of Deletes during the course of a Get or
+ * Scan operation.
  * <p>
  * This class is utilized through three methods:
- * <ul><li>{@link #add} when encountering a Delete</li>
+ * <ul>
+ * <li>{@link #add} when encountering a Delete</li>
  * <li>{@link #isDeleted} when checking if a Put KeyValue has been deleted</li>
  * <li>{@link #update} when reaching the end of a StoreFile</li>
  * </ul>
@@ -35,8 +35,7 @@
 public interface DeleteTracker {
 
   /**
-   * Add the specified cell to the list of deletes to check against for
-   * this row operation.
+   * Add the specified cell to the list of deletes to check against for this row operation.
    * <p>
    * This is called when a Delete is encountered in a StoreFile.
    * @param cell - the delete cell
@@ -44,8 +43,7 @@
   void add(Cell cell);
 
   /**
-   * Check if the specified cell buffer has been deleted by a previously
-   * seen delete.
+   * Check if the specified cell buffer has been deleted by a previously seen delete.
    * @param cell - current cell to check if deleted by a previously seen delete
    * @return deleteResult The result tells whether the KeyValue is deleted and why
    */
@@ -59,8 +57,8 @@
   /**
    * Called at the end of every StoreFile.
    * <p>
-   * Many optimized implementations of Trackers will require an update at
-   * when the end of each StoreFile is reached.
+   * Many optimized implementations of Trackers will require an update at when the end of each
+   * StoreFile is reached.
    */
   void update();
 
@@ -71,14 +69,13 @@
    */
   void reset();
 
-
   /**
    * Return codes for comparison of two Deletes.
    * <p>
    * The codes tell the merging function what to do.
    * <p>
-   * INCLUDE means add the specified Delete to the merged list.
-   * NEXT means move to the next element in the specified list(s).
+   * INCLUDE means add the specified Delete to the merged list. NEXT means move to the next element
+   * in the specified list(s).
    */
   enum DeleteCompare {
     INCLUDE_OLD_NEXT_OLD,
@@ -90,10 +87,8 @@
   }
 
   /**
-   * Returns codes for delete result.
-   * The codes tell the ScanQueryMatcher whether the kv is deleted and why.
-   * Based on the delete result, the ScanQueryMatcher will decide the next
-   * operation
+   * Returns codes for delete result. The codes tell the ScanQueryMatcher whether the kv is deleted
+   * and why. Based on the delete result, the ScanQueryMatcher will decide the next operation
    */
   enum DeleteResult {
     FAMILY_DELETED, // The KeyValue is deleted by a delete family.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
new file mode 100644
index 0000000..89725fe
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/DropDeletesCompactionScanQueryMatcher.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * A query matcher for compaction which can drop delete markers.
+ */
+@InterfaceAudience.Private
+public abstract class DropDeletesCompactionScanQueryMatcher extends CompactionScanQueryMatcher {
+
+  /**
+   * By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete marker is always removed
+   * during a major compaction. If set to non-zero value then major compaction will try to keep a
+   * delete marker around for the given number of milliseconds. We want to keep the delete markers
+   * around a bit longer because old puts might appear out-of-order. For example, during log
+   * replication between two clusters.
+   * <p>
+   * If the delete marker has lived longer than its column-family's TTL then the delete marker will
+   * be removed even if time.to.purge.deletes has not passed. This is because all the Puts that this
+   * delete marker can influence would have also expired. (Removing of delete markers on col family
+   * TTL will not happen if min-versions is set to non-zero)
+   * <p>
+   * But, if time.to.purge.deletes has not expired then a delete marker will not be removed just
+   * because there are no Puts that it is currently influencing. This is because Puts, that this
+   * delete can influence. may appear out of order.
+   */
+  protected final long timeToPurgeDeletes;
+
+  /**
+   * Oldest put in any of the involved store files Used to decide whether it is ok to delete family
+   * delete marker of this store keeps deleted KVs.
+   */
+  protected final long earliestPutTs;
+
+  protected DropDeletesCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now) {
+    super(scanInfo, deletes, readPointToUse, oldestUnexpiredTS, now);
+    this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
+    this.earliestPutTs = earliestPutTs;
+  }
+
+  protected final MatchCode tryDropDelete(Cell cell) {
+    long timestamp = cell.getTimestamp();
+    // If it is not the time to drop the delete marker, just return
+    if (timeToPurgeDeletes > 0 && now - timestamp <= timeToPurgeDeletes) {
+      return MatchCode.INCLUDE;
+    }
+    if (keepDeletedCells == KeepDeletedCells.TRUE
+        || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= oldestUnexpiredTS)) {
+      // If keepDeletedCell is true, or the delete marker is not expired yet, we should include it
+      // in version counting to see if we can drop it. The only exception is that, we can make
+      // sure that no put is older than this delete marker. And under this situation, all later
+      // cells of this column(must be delete markers) can be skipped.
+      if (timestamp < earliestPutTs) {
+        return columns.getNextRowOrNextColumn(cell);
+      } else {
+        return null;
+      }
+    } else {
+      return MatchCode.SKIP;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
similarity index 76%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
index 86c8b48..da65c78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,39 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import java.io.IOException;
 import java.util.NavigableSet;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 
 /**
- * This class is used for the tracking and enforcement of columns and numbers
- * of versions during the course of a Get or Scan operation, when explicit
- * column qualifiers have been asked for in the query.
- *
- * With a little magic (see {@link ScanQueryMatcher}), we can use this matcher
- * for both scans and gets.  The main difference is 'next' and 'done' collapse
- * for the scan case (since we see all columns in order), and we only reset
- * between rows.
- *
+ * This class is used for the tracking and enforcement of columns and numbers of versions during the
+ * course of a Get or Scan operation, when explicit column qualifiers have been asked for in the
+ * query. With a little magic (see {@link ScanQueryMatcher}), we can use this matcher for both scans
+ * and gets. The main difference is 'next' and 'done' collapse for the scan case (since we see all
+ * columns in order), and we only reset between rows.
  * <p>
  * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
- * <ul><li>{@link #checkColumn} is called when a Put satisfies all other
- * conditions of the query.</li>
- * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher
- * believes that the current column should be skipped (by timestamp, filter etc.)</li>
+ * <ul>
+ * <li>{@link #checkColumn} is called when a Put satisfies all other conditions of the query.</li>
+ * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher believes that the current
+ * column should be skipped (by timestamp, filter etc.)</li>
  * </ul>
  * <p>
  * These two methods returns a
- * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode}
- * to define what action should be taken.
+ * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} to define
+ * what action should be taken.
  * <p>
  * This class is NOT thread-safe as queries are never multi-threaded
  */
@@ -58,16 +53,16 @@
   private final int maxVersions;
   private final int minVersions;
 
- /**
-  * Contains the list of columns that the ExplicitColumnTracker is tracking.
-  * Each ColumnCount instance also tracks how many versions of the requested
-  * column have been returned.
-  */
+  /**
+   * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount
+   * instance also tracks how many versions of the requested column have been returned.
+   */
   private final ColumnCount[] columns;
   private int index;
   private ColumnCount column;
-  /** Keeps track of the latest timestamp included for current column.
-   * Used to eliminate duplicates. */
+  /**
+   * Keeps track of the latest timestamp included for current column. Used to eliminate duplicates.
+   */
   private long latestTSOfCurrentColumn;
   private long oldestStamp;
 
@@ -76,23 +71,22 @@
    * @param columns columns specified user in query
    * @param minVersions minimum number of versions to keep
    * @param maxVersions maximum versions to return per column
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in,
-   *  based on TTL
+   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
    */
-  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
-      int maxVersions, long oldestUnexpiredTS) {
+  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions, int maxVersions,
+      long oldestUnexpiredTS) {
     this.maxVersions = maxVersions;
     this.minVersions = minVersions;
     this.oldestStamp = oldestUnexpiredTS;
     this.columns = new ColumnCount[columns.size()];
-    int i=0;
-    for(byte [] column : columns) {
+    int i = 0;
+    for (byte[] column : columns) {
       this.columns[i++] = new ColumnCount(column);
     }
     reset();
   }
 
-    /**
+  /**
    * Done when there are no more columns to match against.
    */
   public boolean done() {
@@ -113,18 +107,18 @@
     assert !CellUtil.isDelete(type);
     do {
       // No more columns left, we are done with this query
-      if(done()) {
+      if (done()) {
         return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
       }
 
       // No more columns to match against, done with storefile
-      if(this.column == null) {
+      if (this.column == null) {
         return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
       }
 
       // Compare specific column to current column
       int ret = CellComparator.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
-          column.getLength());
+        column.getLength());
 
       // Column Matches. Return include code. The caller would call checkVersions
       // to limit the number of versions.
@@ -153,14 +147,16 @@
         // This is the recursive case.
         this.column = this.columns[this.index];
       }
-    } while(true);
+    } while (true);
   }
 
   @Override
-  public ScanQueryMatcher.MatchCode checkVersions(Cell cell,
-      long timestamp, byte type, boolean ignoreCount) throws IOException {
+  public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
+      boolean ignoreCount) throws IOException {
     assert !CellUtil.isDelete(type);
-    if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+    if (ignoreCount) {
+      return ScanQueryMatcher.MatchCode.INCLUDE;
+    }
     // Check if it is a duplicate timestamp
     if (sameAsPreviousTS(timestamp)) {
       // If duplicate, skip this Key
@@ -189,7 +185,7 @@
   public void reset() {
     this.index = 0;
     this.column = this.columns[this.index];
-    for(ColumnCount col : this.columns) {
+    for (ColumnCount col : this.columns) {
       col.setCount(0);
     }
     resetTS();
@@ -212,16 +208,15 @@
   }
 
   /**
-   * This method is used to inform the column tracker that we are done with
-   * this column. We may get this information from external filters or
-   * timestamp range and we then need to indicate this information to
-   * tracker. It is required only in case of ExplicitColumnTracker.
+   * This method is used to inform the column tracker that we are done with this column. We may get
+   * this information from external filters or timestamp range and we then need to indicate this
+   * information to tracker. It is required only in case of ExplicitColumnTracker.
    * @param cell
    */
   public void doneWithColumn(Cell cell) {
     while (this.column != null) {
       int compare = CellComparator.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
-          column.getLength());
+        column.getLength());
       resetTS();
       if (compare >= 0) {
         ++this.index;
@@ -231,7 +226,9 @@
         } else {
           this.column = this.columns[this.index];
         }
-        if (compare > 0) continue;
+        if (compare > 0) {
+          continue;
+        }
       }
       return;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
new file mode 100644
index 0000000..ea4bd97
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * The old query matcher implementation. Used to keep compatibility for coprocessor that could
+ * overwrite the StoreScanner before compaction. Should be removed once we find a better way to do
+ * filtering during compaction.
+ */
+@Deprecated
+@InterfaceAudience.Private
+public class LegacyScanQueryMatcher extends ScanQueryMatcher {
+
+  private final TimeRange tr;
+
+  private final Filter filter;
+
+  /** Keeps track of deletes */
+  private final DeleteTracker deletes;
+
+  /**
+   * The following three booleans define how we deal with deletes. There are three different
+   * aspects:
+   * <ol>
+   * <li>Whether to keep delete markers. This is used in compactions. Minor compactions always keep
+   * delete markers.</li>
+   * <li>Whether to keep deleted rows. This is also used in compactions, if the store is set to keep
+   * deleted rows. This implies keeping the delete markers as well.</li> In this case deleted rows
+   * are subject to the normal max version and TTL/min version rules just like "normal" rows.
+   * <li>Whether a scan can do time travel queries even before deleted marker to reach deleted
+   * rows.</li>
+   * </ol>
+   */
+  /** whether to retain delete markers */
+  private boolean retainDeletesInOutput;
+
+  /** whether to return deleted rows */
+  private final KeepDeletedCells keepDeletedCells;
+
+  // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
+  // marker is always removed during a major compaction. If set to non-zero
+  // value then major compaction will try to keep a delete marker around for
+  // the given number of milliseconds. We want to keep the delete markers
+  // around a bit longer because old puts might appear out-of-order. For
+  // example, during log replication between two clusters.
+  //
+  // If the delete marker has lived longer than its column-family's TTL then
+  // the delete marker will be removed even if time.to.purge.deletes has not
+  // passed. This is because all the Puts that this delete marker can influence
+  // would have also expired. (Removing of delete markers on col family TTL will
+  // not happen if min-versions is set to non-zero)
+  //
+  // But, if time.to.purge.deletes has not expired then a delete
+  // marker will not be removed just because there are no Puts that it is
+  // currently influencing. This is because Puts, that this delete can
+  // influence. may appear out of order.
+  private final long timeToPurgeDeletes;
+
+  /**
+   * This variable shows whether there is an null column in the query. There always exists a null
+   * column in the wildcard column query. There maybe exists a null column in the explicit column
+   * query based on the first column.
+   */
+  private final boolean hasNullColumn;
+
+  /** readPoint over which the KVs are unconditionally included */
+  private final long maxReadPointToTrackVersions;
+
+  /**
+   * Oldest put in any of the involved store files Used to decide whether it is ok to delete family
+   * delete marker of this store keeps deleted KVs.
+   */
+  protected final long earliestPutTs;
+
+  private final byte[] stopRow;
+
+  private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
+
+  private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse,
+      long earliestPutTs, long oldestUnexpiredTS, long now) {
+    super(scan.getStartRow(), scanInfo, columns, oldestUnexpiredTS, now);
+    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
+    if (timeRange == null) {
+      this.tr = scan.getTimeRange();
+    } else {
+      this.tr = timeRange;
+    }
+    this.hasNullColumn = hasNullColumn;
+    this.deletes = deletes;
+    this.filter = scan.getFilter();
+    this.maxReadPointToTrackVersions = readPointToUse;
+    this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
+    this.earliestPutTs = earliestPutTs;
+
+    /* how to deal with deletes */
+    this.keepDeletedCells = scanInfo.getKeepDeletedCells();
+    this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES;
+    this.stopRow = scan.getStopRow();
+  }
+
+  private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse,
+      long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
+      byte[] dropDeletesToRow) {
+    this(scan, scanInfo, columns, hasNullColumn, deletes, scanType, readPointToUse, earliestPutTs,
+        oldestUnexpiredTS, now);
+    this.dropDeletesFromRow = Preconditions.checkNotNull(dropDeletesFromRow);
+    this.dropDeletesToRow = Preconditions.checkNotNull(dropDeletesToRow);
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    if (filter != null && filter.filterAllRemaining()) {
+      return MatchCode.DONE_SCAN;
+    }
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    /*
+     * The delete logic is pretty complicated now.
+     * This is corroborated by the following:
+     * 1. The store might be instructed to keep deleted rows around.
+     * 2. A scan can optionally see past a delete marker now.
+     * 3. If deleted rows are kept, we have to find out when we can
+     *    remove the delete markers.
+     * 4. Family delete markers are always first (regardless of their TS)
+     * 5. Delete markers should not be counted as version
+     * 6. Delete markers affect puts of the *same* TS
+     * 7. Delete marker need to be version counted together with puts
+     *    they affect
+     */
+    long timestamp = cell.getTimestamp();
+    byte typeByte = cell.getTypeByte();
+    long mvccVersion = cell.getSequenceId();
+    if (CellUtil.isDelete(cell)) {
+      if (keepDeletedCells == KeepDeletedCells.FALSE
+          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < oldestUnexpiredTS)) {
+        // first ignore delete markers if the scanner can do so, and the
+        // range does not include the marker
+        //
+        // during flushes and compactions also ignore delete markers newer
+        // than the readpoint of any open scanner, this prevents deleted
+        // rows that could still be seen by a scanner from being collected
+        boolean includeDeleteMarker = tr.withinOrAfterTimeRange(timestamp);
+        if (includeDeleteMarker && mvccVersion <= maxReadPointToTrackVersions) {
+          this.deletes.add(cell);
+        }
+        // Can't early out now, because DelFam come before any other keys
+      }
+
+      if (timeToPurgeDeletes > 0
+          && (EnvironmentEdgeManager.currentTime() - timestamp) <= timeToPurgeDeletes) {
+        return MatchCode.INCLUDE;
+      } else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) {
+        // always include or it is not time yet to check whether it is OK
+        // to purge deltes or not
+        // if this is not a user scan (compaction), we can filter this deletemarker right here
+        // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
+        return MatchCode.INCLUDE;
+      } else if (keepDeletedCells == KeepDeletedCells.TRUE
+          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= oldestUnexpiredTS)) {
+        if (timestamp < earliestPutTs) {
+          // keeping delete rows, but there are no puts older than
+          // this delete in the store files.
+          return columns.getNextRowOrNextColumn(cell);
+        }
+        // else: fall through and do version counting on the
+        // delete markers
+      } else {
+        return MatchCode.SKIP;
+      }
+      // note the following next else if...
+      // delete marker are not subject to other delete markers
+    } else if (!this.deletes.isEmpty()) {
+      DeleteResult deleteResult = deletes.isDeleted(cell);
+      switch (deleteResult) {
+        case FAMILY_DELETED:
+        case COLUMN_DELETED:
+          return columns.getNextRowOrNextColumn(cell);
+        case VERSION_DELETED:
+        case FAMILY_VERSION_DELETED:
+          return MatchCode.SKIP;
+        case NOT_DELETED:
+          break;
+        default:
+          throw new RuntimeException("UNEXPECTED");
+        }
+    }
+
+    int timestampComparison = tr.compare(timestamp);
+    if (timestampComparison >= 1) {
+      return MatchCode.SKIP;
+    } else if (timestampComparison <= -1) {
+      return columns.getNextRowOrNextColumn(cell);
+    }
+
+    // STEP 1: Check if the column is part of the requested columns
+    MatchCode colChecker = columns.checkColumn(cell, typeByte);
+    if (colChecker == MatchCode.INCLUDE) {
+      ReturnCode filterResponse = ReturnCode.SKIP;
+      // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
+      if (filter != null) {
+        // STEP 3: Filter the key value and return if it filters out
+        filterResponse = filter.filterKeyValue(cell);
+        switch (filterResponse) {
+        case SKIP:
+          return MatchCode.SKIP;
+        case NEXT_COL:
+          return columns.getNextRowOrNextColumn(cell);
+        case NEXT_ROW:
+          stickyNextRow = true;
+          return MatchCode.SEEK_NEXT_ROW;
+        case SEEK_NEXT_USING_HINT:
+          return MatchCode.SEEK_NEXT_USING_HINT;
+        default:
+          //It means it is either include or include and seek next
+          break;
+        }
+      }
+      /*
+       * STEP 4: Reaching this step means the column is part of the requested columns and either
+       * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
+       * Now check the number of versions needed. This method call returns SKIP, INCLUDE,
+       * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
+       *
+       * FilterResponse            ColumnChecker               Desired behavior
+       * INCLUDE                   SKIP                        row has already been included, SKIP.
+       * INCLUDE                   INCLUDE                     INCLUDE
+       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
+       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
+       * INCLUDE_AND_SEEK_NEXT_COL SKIP                        row has already been included, SKIP.
+       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE                     INCLUDE_AND_SEEK_NEXT_COL
+       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
+       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
+       *
+       * In all the above scenarios, we return the column checker return value except for
+       * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
+       */
+      colChecker = columns.checkVersions(cell, timestamp, typeByte,
+          mvccVersion > maxReadPointToTrackVersions);
+      //Optimize with stickyNextRow
+      boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW &&
+          filter.isFamilyEssential(cell.getFamilyArray());
+      if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
+        stickyNextRow = true;
+      }
+      if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+        if (colChecker != MatchCode.SKIP) {
+          return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+        }
+        return MatchCode.SEEK_NEXT_ROW;
+      }
+      return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
+          colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
+          : colChecker;
+    }
+    stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true
+        : stickyNextRow;
+    return colChecker;
+  }
+
+  @Override
+  public boolean hasNullColumnInQuery() {
+    return hasNullColumn;
+  }
+
+  /**
+   * Handle partial-drop-deletes. As we match keys in order, when we have a range from which we can
+   * drop deletes, we can set retainDeletesInOutput to false for the duration of this range only,
+   * and maintain consistency.
+   */
+  private void checkPartialDropDeleteRange(Cell curCell) {
+    // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
+    // are both set, and the matcher is set to retain deletes. We assume ordered keys. When
+    // dropDeletesFromRow is leq current kv, we start dropping deletes and reset
+    // dropDeletesFromRow; thus the 2nd "if" starts to apply.
+    if ((dropDeletesFromRow != null)
+        && (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW)
+            || (CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0,
+              dropDeletesFromRow.length) >= 0))) {
+      retainDeletesInOutput = false;
+      dropDeletesFromRow = null;
+    }
+    // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial-
+    // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
+    // and reset dropDeletesToRow so that we don't do any more compares.
+    if ((dropDeletesFromRow == null) && (dropDeletesToRow != null)
+        && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW) && (CellComparator.COMPARATOR
+            .compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
+      retainDeletesInOutput = true;
+      dropDeletesToRow = null;
+    }
+  }
+
+  @Override
+  protected void reset() {
+    checkPartialDropDeleteRange(currentRow);
+  }
+
+  @Override
+  public boolean isUserScan() {
+    return false;
+  }
+
+  @Override
+  public boolean moreRowsMayExistAfter(Cell cell) {
+    if (this.stopRow == null || this.stopRow.length == 0) {
+      return true;
+    }
+    return rowComparator.compareRows(cell, stopRow, 0, stopRow.length) < 0;
+  }
+
+  @Override
+  public Filter getFilter() {
+    return filter;
+  }
+
+  @Override
+  public Cell getNextKeyHint(Cell cell) throws IOException {
+    if (filter == null) {
+      return null;
+    } else {
+      return filter.getNextCellHint(cell);
+    }
+  }
+
+  public static LegacyScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
+      NavigableSet<byte[]> columns, ScanType scanType, long readPointToUse, long earliestPutTs,
+      long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow,
+      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+    int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
+    boolean hasNullColumn;
+    ColumnTracker columnTracker;
+    if (columns == null || columns.size() == 0) {
+      // there is always a null column in the wildcard column query.
+      hasNullColumn = true;
+      // use a specialized scan for wildcard column tracker.
+      columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions,
+          oldestUnexpiredTS);
+    } else {
+      // We can share the ExplicitColumnTracker, diff is we reset
+      // between rows, not between storefiles.
+      // whether there is null column in the explicit column query
+      hasNullColumn = columns.first().length == 0;
+      columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
+          oldestUnexpiredTS);
+    }
+    DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost);
+    if (dropDeletesFromRow == null) {
+      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deletes,
+          scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
+    } else {
+      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deletes,
+          scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow,
+          dropDeletesToRow);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
new file mode 100644
index 0000000..6a2ed40
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MajorCompactionScanQueryMatcher.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for major compaction.
+ */
+@InterfaceAudience.Private
+public class MajorCompactionScanQueryMatcher extends DropDeletesCompactionScanQueryMatcher {
+
+  public MajorCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now) {
+    super(scanInfo, deletes, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    long timestamp = cell.getTimestamp();
+    long mvccVersion = cell.getSequenceId();
+
+    // The delete logic is pretty complicated now.
+    // This is corroborated by the following:
+    // 1. The store might be instructed to keep deleted rows around.
+    // 2. A scan can optionally see past a delete marker now.
+    // 3. If deleted rows are kept, we have to find out when we can
+    // remove the delete markers.
+    // 4. Family delete markers are always first (regardless of their TS)
+    // 5. Delete markers should not be counted as version
+    // 6. Delete markers affect puts of the *same* TS
+    // 7. Delete marker need to be version counted together with puts
+    // they affect
+    //
+    if (CellUtil.isDelete(cell)) {
+      if (mvccVersion > maxReadPointToTrackVersions) {
+        // We can not drop this delete marker yet, and also we should not use this delete marker to
+        // mask any cell yet.
+        return MatchCode.INCLUDE;
+      }
+      trackDelete(cell);
+      returnCode = tryDropDelete(cell);
+      if (returnCode != null) {
+        return returnCode;
+      }
+    } else {
+      returnCode = checkDeleted(deletes, cell);
+      if (returnCode != null) {
+        return returnCode;
+      }
+    }
+    // Skip checking column since we do not remove column during compaction.
+    return columns.checkVersions(cell, timestamp, cell.getTypeByte(),
+      mvccVersion > maxReadPointToTrackVersions);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
new file mode 100644
index 0000000..3b6acde
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/MinorCompactionScanQueryMatcher.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for minor compaction.
+ */
+@InterfaceAudience.Private
+public class MinorCompactionScanQueryMatcher extends CompactionScanQueryMatcher {
+
+  public MinorCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      long readPointToUse, long oldestUnexpiredTS, long now) {
+    super(scanInfo, deletes, readPointToUse, oldestUnexpiredTS, now);
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    long mvccVersion = cell.getSequenceId();
+    if (CellUtil.isDelete(cell)) {
+      if (mvccVersion > maxReadPointToTrackVersions) {
+        // we should not use this delete marker to mask any cell yet.
+        return MatchCode.INCLUDE;
+      }
+      trackDelete(cell);
+      return MatchCode.INCLUDE;
+    }
+    returnCode = checkDeleted(deletes, cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    // Skip checking column since we do not remove column during compaction.
+    return columns.checkVersions(cell, cell.getTimestamp(), cell.getTypeByte(),
+      mvccVersion > maxReadPointToTrackVersions);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
new file mode 100644
index 0000000..3942f04
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/NormalUserScanQueryMatcher.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for normal user scan.
+ */
+@InterfaceAudience.Private
+public class NormalUserScanQueryMatcher extends UserScanQueryMatcher {
+
+  /** Keeps track of deletes */
+  private final DeleteTracker deletes;
+
+  /** True if we are doing a 'Get' Scan. Every Get is actually a one-row Scan. */
+  private final boolean get;
+
+  /** whether time range queries can see rows "behind" a delete */
+  private final boolean seePastDeleteMarkers;
+
+  protected NormalUserScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, DeleteTracker deletes, long oldestUnexpiredTS, long now) {
+    super(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, now);
+    this.deletes = deletes;
+    this.get = scan.isGetScan();
+    this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    if (filter != null && filter.filterAllRemaining()) {
+      return MatchCode.DONE_SCAN;
+    }
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    long timestamp = cell.getTimestamp();
+    if (CellUtil.isDelete(cell)) {
+      boolean includeDeleteMarker = seePastDeleteMarkers ? tr.withinTimeRange(timestamp)
+          : tr.withinOrAfterTimeRange(timestamp);
+      if (includeDeleteMarker) {
+        this.deletes.add(cell);
+      }
+      return MatchCode.SKIP;
+    }
+    returnCode = checkDeleted(deletes, cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    return matchColumn(cell);
+  }
+
+  @Override
+  protected void reset() {
+    deletes.reset();
+  }
+
+  @Override
+  protected boolean isGet() {
+    return get;
+  }
+
+  public static NormalUserScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
+      ColumnTracker columns, boolean hasNullColumn, long oldestUnexpiredTS, long now,
+      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+    DeleteTracker deletes = instantiateDeleteTracker(regionCoprocessorHost);
+    if (scan.isReversed()) {
+      return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes,
+          oldestUnexpiredTS, now) {
+
+        @Override
+        protected boolean moreRowsMayExistsAfter(int cmpToStopRow) {
+          return cmpToStopRow > 0;
+        }
+      };
+    } else {
+      return new NormalUserScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, deletes,
+          oldestUnexpiredTS, now);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java
new file mode 100644
index 0000000..acdae90
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/RawScanQueryMatcher.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for raw scan.
+ */
+@InterfaceAudience.Private
+public class RawScanQueryMatcher extends UserScanQueryMatcher {
+
+  protected RawScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, long oldestUnexpiredTS, long now) {
+    super(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS, now);
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    if (filter != null && filter.filterAllRemaining()) {
+      return MatchCode.DONE_SCAN;
+    }
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    // For a raw scan, we do not filter out any cells by delete marker, and delete marker is also
+    // returned, so we do not need to track delete.
+    return matchColumn(cell);
+  }
+
+  @Override
+  protected void reset() {
+  }
+
+  @Override
+  protected boolean isGet() {
+    return false;
+  }
+
+  public static RawScanQueryMatcher create(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, long oldestUnexpiredTS, long now) {
+    if (scan.isReversed()) {
+      return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS,
+          now) {
+
+        @Override
+        protected boolean moreRowsMayExistsAfter(int cmpToStopRow) {
+          return cmpToStopRow > 0;
+        }
+      };
+    } else {
+      return new RawScanQueryMatcher(scan, scanInfo, columns, hasNullColumn, oldestUnexpiredTS,
+          now);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanDeleteTracker.java
similarity index 86%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanDeleteTracker.java
index e2db008..450a30e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanDeleteTracker.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -30,15 +29,13 @@
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * This class is responsible for the tracking and enforcement of Deletes
- * during the course of a Scan operation.
- *
- * It only has to enforce Delete and DeleteColumn, since the
- * DeleteFamily is handled at a higher level.
- *
+ * This class is responsible for the tracking and enforcement of Deletes during the course of a Scan
+ * operation. It only has to enforce Delete and DeleteColumn, since the DeleteFamily is handled at a
+ * higher level.
  * <p>
  * This class is utilized through three methods:
- * <ul><li>{@link #add} when encountering a Delete or DeleteColumn</li>
+ * <ul>
+ * <li>{@link #add} when encountering a Delete or DeleteColumn</li>
  * <li>{@link #isDeleted} when checking if a Put KeyValue has been deleted</li>
  * <li>{@link #update} when reaching the end of a StoreFile or row for scans</li>
  * </ul>
@@ -51,22 +48,14 @@
   protected boolean hasFamilyStamp = false;
   protected long familyStamp = 0L;
   protected SortedSet<Long> familyVersionStamps = new TreeSet<Long>();
-  protected byte [] deleteBuffer = null;
+  protected byte[] deleteBuffer = null;
   protected int deleteOffset = 0;
   protected int deleteLength = 0;
   protected byte deleteType = 0;
   protected long deleteTimestamp = 0L;
 
   /**
-   * Constructor for ScanDeleteTracker
-   */
-  public ScanDeleteTracker() {
-    super();
-  }
-
-  /**
-   * Add the specified KeyValue to the list of deletes to check against for
-   * this row operation.
+   * Add the specified KeyValue to the list of deletes to check against for this row operation.
    * <p>
    * This is called when a Delete is encountered.
    * @param cell - the delete cell
@@ -102,9 +91,7 @@
   }
 
   /**
-   * Check if the specified KeyValue buffer has been deleted by a previously
-   * seen delete.
-   *
+   * Check if the specified KeyValue buffer has been deleted by a previously seen delete.
    * @param cell - current cell to check if deleted by a previously seen delete
    * @return deleteResult
    */
@@ -116,7 +103,7 @@
     }
 
     if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
-        return DeleteResult.FAMILY_VERSION_DELETED;
+      return DeleteResult.FAMILY_VERSION_DELETED;
     }
 
     if (deleteBuffer != null) {
@@ -135,15 +122,14 @@
 
         // different timestamp, let's clear the buffer.
         deleteBuffer = null;
-      } else if(ret < 0){
+      } else if (ret < 0) {
         // Next column case.
         deleteBuffer = null;
       } else {
         throw new IllegalStateException("isDelete failed: deleteBuffer="
-            + Bytes.toStringBinary(deleteBuffer, deleteOffset, deleteLength)
-            + ", qualifier="
+            + Bytes.toStringBinary(deleteBuffer, deleteOffset, deleteLength) + ", qualifier="
             + Bytes.toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength())
+              cell.getQualifierLength())
             + ", timestamp=" + timestamp + ", comparison result: " + ret);
       }
     }
@@ -153,8 +139,7 @@
 
   @Override
   public boolean isEmpty() {
-    return deleteBuffer == null && !hasFamilyStamp &&
-           familyVersionStamps.isEmpty();
+    return deleteBuffer == null && !hasFamilyStamp && familyVersionStamps.isEmpty();
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
new file mode 100644
index 0000000..b5469d3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.TagUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A query matcher that is specifically designed for the scan case.
+ */
+@InterfaceAudience.Private
+public abstract class ScanQueryMatcher {
+
+  /**
+   * {@link #match} return codes. These instruct the scanner moving through memstores and StoreFiles
+   * what to do with the current KeyValue.
+   * <p>
+   * Additionally, this contains "early-out" language to tell the scanner to move on to the next
+   * File (memstore or Storefile), or to return immediately.
+   */
+  public static enum MatchCode {
+    /**
+     * Include KeyValue in the returned result
+     */
+    INCLUDE,
+
+    /**
+     * Do not include KeyValue in the returned result
+     */
+    SKIP,
+
+    /**
+     * Do not include, jump to next StoreFile or memstore (in time order)
+     */
+    NEXT,
+
+    /**
+     * Do not include, return current result
+     */
+    DONE,
+
+    /**
+     * These codes are used by the ScanQueryMatcher
+     */
+
+    /**
+     * Done with the row, seek there.
+     */
+    SEEK_NEXT_ROW,
+
+    /**
+     * Done with column, seek to next.
+     */
+    SEEK_NEXT_COL,
+
+    /**
+     * Done with scan, thanks to the row filter.
+     */
+    DONE_SCAN,
+
+    /**
+     * Seek to next key which is given as hint.
+     */
+    SEEK_NEXT_USING_HINT,
+
+    /**
+     * Include KeyValue and done with column, seek to next.
+     */
+    INCLUDE_AND_SEEK_NEXT_COL,
+
+    /**
+     * Include KeyValue and done with row, seek to next.
+     */
+    INCLUDE_AND_SEEK_NEXT_ROW,
+  }
+
+  /** Row comparator for the region this query is for */
+  protected final CellComparator rowComparator;
+
+  /** Key to seek to in memstore and StoreFiles */
+  protected final Cell startKey;
+
+  /** Keeps track of columns and versions */
+  protected final ColumnTracker columns;
+
+  /** The oldest timestamp we are interested in, based on TTL */
+  protected final long oldestUnexpiredTS;
+
+  protected final long now;
+
+  /** Row the query is on */
+  protected Cell currentRow;
+
+  protected boolean stickyNextRow;
+
+  protected ScanQueryMatcher(byte[] startRow, ScanInfo scanInfo, ColumnTracker columns,
+      long oldestUnexpiredTS, long now) {
+    this.rowComparator = scanInfo.getComparator();
+    this.startKey = CellUtil.createFirstDeleteFamilyCellOnRow(startRow, scanInfo.getFamily());
+    this.oldestUnexpiredTS = oldestUnexpiredTS;
+    this.now = now;
+    this.columns = columns;
+  }
+
+  /**
+   * @param cell
+   * @param oldestTimestamp
+   * @return true if the cell is expired
+   */
+  private static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp,
+      final long now) {
+    // Look for a TTL tag first. Use it instead of the family setting if
+    // found. If a cell has multiple TTLs, resolve the conflict by using the
+    // first tag encountered.
+    Iterator<Tag> i = CellUtil.tagsIterator(cell);
+    while (i.hasNext()) {
+      Tag t = i.next();
+      if (TagType.TTL_TAG_TYPE == t.getType()) {
+        // Unlike in schema cell TTLs are stored in milliseconds, no need
+        // to convert
+        long ts = cell.getTimestamp();
+        assert t.getValueLength() == Bytes.SIZEOF_LONG;
+        long ttl = TagUtil.getValueAsLong(t);
+        if (ts + ttl < now) {
+          return true;
+        }
+        // Per cell TTLs cannot extend lifetime beyond family settings, so
+        // fall through to check that
+        break;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check before the delete logic.
+   * @return null means continue.
+   */
+  protected final MatchCode preCheck(Cell cell) {
+    if (currentRow == null) {
+      // Since the curCell is null it means we are already sure that we have moved over to the next
+      // row
+      return MatchCode.DONE;
+    }
+    // if row key is changed, then we know that we have moved over to the next row
+    if (rowComparator.compareRows(currentRow, cell) != 0) {
+      return MatchCode.DONE;
+    }
+    // optimize case.
+    if (this.stickyNextRow) {
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    if (this.columns.done()) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    long timestamp = cell.getTimestamp();
+    // check if this is a fake cell. The fake cell is an optimization, we should make the scanner
+    // seek to next column or next row. See StoreFileScanner.requestSeek for more details.
+    // check for early out based on timestamp alone
+    if (timestamp == HConstants.OLDEST_TIMESTAMP || columns.isDone(timestamp)) {
+      return columns.getNextRowOrNextColumn(cell);
+    }
+    // check if the cell is expired by cell TTL
+    if (isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+      return MatchCode.SKIP;
+    }
+    return null;
+  }
+
+  protected final MatchCode checkDeleted(DeleteTracker deletes, Cell cell) {
+    if (deletes.isEmpty()) {
+      return null;
+    }
+    DeleteResult deleteResult = deletes.isDeleted(cell);
+    switch (deleteResult) {
+      case FAMILY_DELETED:
+      case COLUMN_DELETED:
+        return columns.getNextRowOrNextColumn(cell);
+      case VERSION_DELETED:
+      case FAMILY_VERSION_DELETED:
+        return MatchCode.SKIP;
+      case NOT_DELETED:
+        return null;
+      default:
+        throw new RuntimeException("Unexpected delete result: " + deleteResult);
+    }
+  }
+
+  /**
+   * Determines if the caller should do one of several things:
+   * <ul>
+   * <li>seek/skip to the next row (MatchCode.SEEK_NEXT_ROW)</li>
+   * <li>seek/skip to the next column (MatchCode.SEEK_NEXT_COL)</li>
+   * <li>include the current KeyValue (MatchCode.INCLUDE)</li>
+   * <li>ignore the current KeyValue (MatchCode.SKIP)</li>
+   * <li>got to the next row (MatchCode.DONE)</li>
+   * </ul>
+   * @param cell KeyValue to check
+   * @return The match code instance.
+   * @throws IOException in case there is an internal consistency problem caused by a data
+   *           corruption.
+   */
+  public abstract MatchCode match(Cell cell) throws IOException;
+
+  /**
+   * @return the start key
+   */
+  public Cell getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * @return whether there is an null column in the query
+   */
+  public abstract boolean hasNullColumnInQuery();
+
+  /**
+   * @return a cell represent the current row
+   */
+  public Cell currentRow() {
+    return currentRow;
+  }
+
+  /**
+   * Make {@link #currentRow()} return null.
+   */
+  public void clearCurrentRow() {
+    currentRow = null;
+  }
+
+  protected abstract void reset();
+
+  /**
+   * Set the row when there is change in row
+   * @param currentRow
+   */
+  public void setToNewRow(Cell currentRow) {
+    this.currentRow = currentRow;
+    columns.reset();
+    reset();
+    stickyNextRow = false;
+  }
+
+  public abstract boolean isUserScan();
+
+  /**
+   * @return Returns false if we know there are no more rows to be scanned (We've reached the
+   *         <code>stopRow</code> or we are scanning on row only because this Scan is for a Get,
+   *         etc.
+   */
+  public abstract boolean moreRowsMayExistAfter(Cell cell);
+
+  public Cell getKeyForNextColumn(Cell cell) {
+    ColumnCount nextColumn = columns.getColumnHint();
+    if (nextColumn == null) {
+      return CellUtil.createLastOnRowCol(cell);
+    } else {
+      return CellUtil.createFirstOnRowCol(cell, nextColumn.getBuffer(), nextColumn.getOffset(),
+        nextColumn.getLength());
+    }
+  }
+
+  /**
+   * @param nextIndexed the key of the next entry in the block index (if any)
+   * @param currentCell The Cell we're using to calculate the seek key
+   * @return result of the compare between the indexed key and the key portion of the passed cell
+   */
+  public int compareKeyForNextRow(Cell nextIndexed, Cell currentCell) {
+    return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell, 0, 0, null, 0, 0,
+      HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode());
+  }
+
+  /**
+   * @param nextIndexed the key of the next entry in the block index (if any)
+   * @param currentCell The Cell we're using to calculate the seek key
+   * @return result of the compare between the indexed key and the key portion of the passed cell
+   */
+  public int compareKeyForNextColumn(Cell nextIndexed, Cell currentCell) {
+    ColumnCount nextColumn = columns.getColumnHint();
+    if (nextColumn == null) {
+      return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell, 0, 0, null, 0, 0,
+        HConstants.OLDEST_TIMESTAMP, Type.Minimum.getCode());
+    } else {
+      return rowComparator.compareKeyBasedOnColHint(nextIndexed, currentCell,
+        currentCell.getFamilyOffset(), currentCell.getFamilyLength(), nextColumn.getBuffer(),
+        nextColumn.getOffset(), nextColumn.getLength(), HConstants.LATEST_TIMESTAMP,
+        Type.Maximum.getCode());
+    }
+  }
+
+  /**
+   * @return the Filter
+   */
+  public abstract Filter getFilter();
+
+  /**
+   * Delegate to {@link Filter#getNextCellHint(Cell)}. If no filter, return {@code null}.
+   */
+  public abstract Cell getNextKeyHint(Cell cell) throws IOException;
+
+  protected static DeleteTracker instantiateDeleteTracker(RegionCoprocessorHost host)
+      throws IOException {
+    DeleteTracker tracker = new ScanDeleteTracker();
+    if (host != null) {
+      tracker = host.postInstantiateDeleteTracker(tracker);
+    }
+    return tracker;
+  }
+
+  // Used only for testing purposes
+  static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset, int length,
+      long ttl, byte type, boolean ignoreCount) throws IOException {
+    KeyValue kv = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY, 0, 0,
+      HConstants.EMPTY_BYTE_ARRAY, 0, 0, bytes, offset, length);
+    MatchCode matchCode = columnTracker.checkColumn(kv, type);
+    if (matchCode == MatchCode.INCLUDE) {
+      return columnTracker.checkVersions(kv, ttl, type, ignoreCount);
+    }
+    return matchCode;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
similarity index 75%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
index b5ef319..e3994b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -38,8 +37,10 @@
   private int currentCount = 0;
   private int maxVersions;
   private int minVersions;
-  /* Keeps track of the latest timestamp and type included for current column.
-   * Used to eliminate duplicates. */
+  /*
+   * Keeps track of the latest timestamp and type included for current column. Used to eliminate
+   * duplicates.
+   */
   private long latestTSOfCurrentColumn;
   private byte latestTypeOfCurrentColumn;
 
@@ -49,19 +50,16 @@
    * Return maxVersions of every row.
    * @param minVersion Minimum number of versions to keep
    * @param maxVersion Maximum number of versions to return
-   * @param oldestUnexpiredTS oldest timestamp that has not expired according
-   *          to the TTL.
+   * @param oldestUnexpiredTS oldest timestamp that has not expired according to the TTL.
    */
-  public ScanWildcardColumnTracker(int minVersion, int maxVersion,
-      long oldestUnexpiredTS) {
+  public ScanWildcardColumnTracker(int minVersion, int maxVersion, long oldestUnexpiredTS) {
     this.maxVersions = maxVersion;
     this.minVersions = minVersion;
     this.oldestStamp = oldestUnexpiredTS;
   }
 
   /**
-   * {@inheritDoc}
-   * This receives puts *and* deletes.
+   * {@inheritDoc} This receives puts *and* deletes.
    */
   @Override
   public MatchCode checkColumn(Cell cell, byte type) throws IOException {
@@ -69,26 +67,29 @@
   }
 
   /**
-   * {@inheritDoc}
-   * This receives puts *and* deletes. Deletes do not count as a version, but rather
+   * {@inheritDoc} This receives puts *and* deletes. Deletes do not count as a version, but rather
    * take the version of the previous put (so eventually all but the last can be reclaimed).
    */
   @Override
-  public ScanQueryMatcher.MatchCode checkVersions(Cell cell,
-      long timestamp, byte type, boolean ignoreCount) throws IOException {
+  public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
+      boolean ignoreCount) throws IOException {
 
     if (columnCell == null) {
       // first iteration.
       resetCell(cell);
-      if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+      if (ignoreCount) {
+        return ScanQueryMatcher.MatchCode.INCLUDE;
+      }
       // do not count a delete marker as another version
       return checkVersion(type, timestamp);
     }
     int cmp = CellComparator.compareQualifiers(cell, this.columnCell);
     if (cmp == 0) {
-      if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+      if (ignoreCount) {
+        return ScanQueryMatcher.MatchCode.INCLUDE;
+      }
 
-      //If column matches, check if it is a duplicate timestamp
+      // If column matches, check if it is a duplicate timestamp
       if (sameAsPreviousTSAndType(timestamp, type)) {
         return ScanQueryMatcher.MatchCode.SKIP;
       }
@@ -101,7 +102,9 @@
     if (cmp > 0) {
       // switched columns, lets do something.x
       resetCell(cell);
-      if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
+      if (ignoreCount) {
+        return ScanQueryMatcher.MatchCode.INCLUDE;
+      }
       return checkVersion(type, timestamp);
     }
 
@@ -109,10 +112,9 @@
     // WARNING: This means that very likely an edit for some other family
     // was incorrectly stored into the store for this one. Throw an exception,
     // because this might lead to data corruption.
-    throw new IOException(
-        "ScanWildcardColumnTracker.checkColumn ran into a column actually " +
-        "smaller than the previous column: " +
-        Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
+    throw new IOException("ScanWildcardColumnTracker.checkColumn ran into a column actually "
+        + "smaller than the previous column: "
+        + Bytes.toStringBinary(CellUtil.cloneQualifier(cell)));
   }
 
   private void resetCell(Cell columnCell) {
@@ -121,13 +123,10 @@
   }
 
   /**
-   * Check whether this version should be retained.
-   * There are 4 variables considered:
-   * If this version is past max versions -> skip it
-   * If this kv has expired or was deleted, check min versions
-   * to decide whther to skip it or not.
-   *
-   * Increase the version counter unless this is a delete
+   * Check whether this version should be retained. There are 4 variables considered: If this
+   * version is past max versions -> skip it If this kv has expired or was deleted, check min
+   * versions to decide whther to skip it or not. Increase the version counter unless this is a
+   * delete
    */
   private MatchCode checkVersion(byte type, long timestamp) {
     if (!CellUtil.isDelete(type)) {
@@ -171,10 +170,9 @@
   }
 
   /**
-   * Used by matcher and scan/get to get a hint of the next column
-   * to seek to after checkColumn() returns SKIP.  Returns the next interesting
-   * column we want, or NULL there is none (wildcard scanner).
-   *
+   * Used by matcher and scan/get to get a hint of the next column to seek to after checkColumn()
+   * returns SKIP. Returns the next interesting column we want, or NULL there is none (wildcard
+   * scanner).
    * @return The column count.
    */
   public ColumnCount getColumnHint() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
new file mode 100644
index 0000000..c1e63b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/StripeCompactionScanQueryMatcher.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for stripe compaction if range drop deletes is used.
+ */
+@InterfaceAudience.Private
+public class StripeCompactionScanQueryMatcher extends DropDeletesCompactionScanQueryMatcher {
+
+  private final byte[] dropDeletesFromRow;
+
+  private final byte[] dropDeletesToRow;
+
+  private enum DropDeletesInOutput {
+    BEFORE, IN, AFTER
+  }
+
+  private DropDeletesInOutput dropDeletesInOutput = DropDeletesInOutput.BEFORE;
+
+  public StripeCompactionScanQueryMatcher(ScanInfo scanInfo, DeleteTracker deletes,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now,
+      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+    super(scanInfo, deletes, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
+    this.dropDeletesFromRow = dropDeletesFromRow;
+    this.dropDeletesToRow = dropDeletesToRow;
+  }
+
+  @Override
+  public MatchCode match(Cell cell) throws IOException {
+    MatchCode returnCode = preCheck(cell);
+    if (returnCode != null) {
+      return returnCode;
+    }
+    long mvccVersion = cell.getSequenceId();
+    if (CellUtil.isDelete(cell)) {
+      if (mvccVersion > maxReadPointToTrackVersions) {
+        return MatchCode.INCLUDE;
+      }
+      trackDelete(cell);
+      if (dropDeletesInOutput == DropDeletesInOutput.IN) {
+        // here we are running like major compaction
+        trackDelete(cell);
+        returnCode = tryDropDelete(cell);
+        if (returnCode != null) {
+          return returnCode;
+        }
+      } else {
+        return MatchCode.INCLUDE;
+      }
+    } else {
+      returnCode = checkDeleted(deletes, cell);
+      if (returnCode != null) {
+        return returnCode;
+      }
+    }
+    // Skip checking column since we do not remove column during compaction.
+    return columns.checkVersions(cell, cell.getTimestamp(), cell.getTypeByte(),
+      mvccVersion > maxReadPointToTrackVersions);
+  }
+
+  private boolean entered() {
+    return dropDeletesFromRow.length == 0 || rowComparator.compareRows(currentRow,
+      dropDeletesFromRow, 0, dropDeletesFromRow.length) >= 0;
+  }
+
+  private boolean left() {
+    return dropDeletesToRow.length > 0
+        && rowComparator.compareRows(currentRow, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0;
+  }
+
+  @Override
+  protected void reset() {
+    super.reset();
+    // Check if we are about to enter or leave the drop deletes range.
+    switch (dropDeletesInOutput) {
+      case BEFORE:
+        if (entered()) {
+          if (left()) {
+            // Already out of range, which means there are no rows within the range.
+            dropDeletesInOutput = DropDeletesInOutput.AFTER;
+          } else {
+            dropDeletesInOutput = DropDeletesInOutput.IN;
+          }
+        }
+        break;
+      case IN:
+        if (left()) {
+          dropDeletesInOutput = DropDeletesInOutput.AFTER;
+        }
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
new file mode 100644
index 0000000..ec7fc11
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+
+/**
+ * Query matcher for user scan.
+ * <p>
+ * We do not consider mvcc here because
+ * {@link org.apache.hadoop.hbase.regionserver.StoreFileScanner} and
+ * {@link org.apache.hadoop.hbase.regionserver.SegmentScanner} will only return a cell whose mvcc is
+ * less than or equal to given read point. For
+ * {@link org.apache.hadoop.hbase.client.IsolationLevel#READ_UNCOMMITTED}, we just set the read
+ * point to {@link Long#MAX_VALUE}, i.e. still do not need to consider it.
+ */
+@InterfaceAudience.Private
+public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
+
+  protected final boolean hasNullColumn;
+
+  protected final Filter filter;
+
+  protected final byte[] stopRow;
+
+  protected final TimeRange tr;
+
+  protected UserScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns,
+      boolean hasNullColumn, long oldestUnexpiredTS, long now) {
+    super(scan.getStartRow(), scanInfo, columns, oldestUnexpiredTS, now);
+    this.hasNullColumn = hasNullColumn;
+    this.filter = scan.getFilter();
+    this.stopRow = scan.getStopRow();
+    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
+    if (timeRange == null) {
+      this.tr = scan.getTimeRange();
+    } else {
+      this.tr = timeRange;
+    }
+  }
+
+  @Override
+  public boolean hasNullColumnInQuery() {
+    return hasNullColumn;
+  }
+
+  @Override
+  public boolean isUserScan() {
+    return true;
+  }
+
+  @Override
+  public Filter getFilter() {
+    return filter;
+  }
+
+  @Override
+  public Cell getNextKeyHint(Cell cell) throws IOException {
+    if (filter == null) {
+      return null;
+    } else {
+      return filter.getNextCellHint(cell);
+    }
+  }
+
+  protected final MatchCode matchColumn(Cell cell) throws IOException {
+    long timestamp = cell.getTimestamp();
+    int tsCmp = tr.compare(timestamp);
+    if (tsCmp > 0) {
+      return MatchCode.SKIP;
+    }
+    if (tsCmp < 0) {
+      return columns.getNextRowOrNextColumn(cell);
+    }
+    byte typeByte = cell.getTypeByte();
+    // STEP 1: Check if the column is part of the requested columns
+    MatchCode colChecker = columns.checkColumn(cell, typeByte);
+    if (colChecker != MatchCode.INCLUDE) {
+      if (colChecker == MatchCode.SEEK_NEXT_ROW) {
+        stickyNextRow = true;
+      }
+      return colChecker;
+    }
+    ReturnCode filterResponse = ReturnCode.SKIP;
+    // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
+    if (filter != null) {
+      // STEP 3: Filter the key value and return if it filters out
+      filterResponse = filter.filterKeyValue(cell);
+      switch (filterResponse) {
+        case SKIP:
+          return MatchCode.SKIP;
+        case NEXT_COL:
+          return columns.getNextRowOrNextColumn(cell);
+        case NEXT_ROW:
+          stickyNextRow = true;
+          return MatchCode.SEEK_NEXT_ROW;
+        case SEEK_NEXT_USING_HINT:
+          return MatchCode.SEEK_NEXT_USING_HINT;
+        default:
+          // It means it is either include or include and seek next
+          break;
+      }
+    }
+    /*
+     * STEP 4: Reaching this step means the column is part of the requested columns and either
+     * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
+     * Now check the number of versions needed. This method call returns SKIP, INCLUDE,
+     * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
+     *
+     * FilterResponse            ColumnChecker               Desired behavior
+     * INCLUDE                   SKIP                        row has already been included, SKIP.
+     * INCLUDE                   INCLUDE                     INCLUDE
+     * INCLUDE                   INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
+     * INCLUDE                   INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
+     * INCLUDE_AND_SEEK_NEXT_COL SKIP                        row has already been included, SKIP.
+     * INCLUDE_AND_SEEK_NEXT_COL INCLUDE                     INCLUDE_AND_SEEK_NEXT_COL
+     * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
+     * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
+     *
+     * In all the above scenarios, we return the column checker return value except for
+     * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
+     */
+    colChecker = columns.checkVersions(cell, timestamp, typeByte, false);
+    // Optimize with stickyNextRow
+    boolean seekNextRowFromEssential = filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW
+        && filter.isFamilyEssential(cell.getFamilyArray());
+    if (colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW || seekNextRowFromEssential) {
+      stickyNextRow = true;
+    }
+    if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+      if (colChecker != MatchCode.SKIP) {
+        return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+      }
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+    return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && colChecker == MatchCode.INCLUDE)
+        ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL : colChecker;
+  }
+
+  protected abstract boolean isGet();
+
+  protected boolean moreRowsMayExistsAfter(int cmpToStopRow) {
+    return cmpToStopRow < 0;
+  }
+
+  @Override
+  public boolean moreRowsMayExistAfter(Cell cell) {
+    // If a 'get' Scan -- we are doing a Get (every Get is a single-row Scan in implementation) --
+    // then we are looking at one row only, the one specified in the Get coordinate..so we know
+    // for sure that there are no more rows on this Scan
+    if (isGet()) {
+      return false;
+    }
+    // If no stopRow, return that there may be more rows. The tests that follow depend on a
+    // non-empty, non-default stopRow so this little test below short-circuits out doing the
+    // following compares.
+    if (this.stopRow == null || this.stopRow.length == 0) {
+      return true;
+    }
+    return moreRowsMayExistsAfter(rowComparator.compareRows(cell, stopRow, 0, stopRow.length));
+  }
+
+  public static UserScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
+      NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now,
+      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+    int maxVersions = scan.isRaw() ? scan.getMaxVersions()
+        : Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
+    boolean hasNullColumn;
+    ColumnTracker columnTracker;
+    if (columns == null || columns.size() == 0) {
+      // there is always a null column in the wildcard column query.
+      hasNullColumn = true;
+      // use a specialized scan for wildcard column tracker.
+      columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions,
+          oldestUnexpiredTS);
+    } else {
+      // We can share the ExplicitColumnTracker, diff is we reset
+      // between rows, not between storefiles.
+      // whether there is null column in the explicit column query
+      hasNullColumn = columns.first().length == 0;
+      columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
+          oldestUnexpiredTS);
+    }
+    if (scan.isRaw()) {
+      return RawScanQueryMatcher.create(scan, scanInfo, columnTracker, hasNullColumn,
+        oldestUnexpiredTS, now);
+    } else {
+      return NormalUserScanQueryMatcher.create(scan, scanInfo, columnTracker, hasNullColumn,
+        oldestUnexpiredTS, now, regionCoprocessorHost);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index f7bcef0..0cf6e37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -90,13 +90,13 @@
 import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
 import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
 import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker;
 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.Superusers;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
index 51971e7..4e27bbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
@@ -25,13 +25,13 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanDeleteTracker;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.regionserver.ScanDeleteTracker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Triple;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
index aba72dd..bbcdce4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
@@ -598,7 +598,7 @@
 
     StoreFileReader reader = hsf.createReader();
     reader.loadFileInfo();
-    KeyValueScanner scanner = reader.getStoreFileScanner(true, true);
+    KeyValueScanner scanner = reader.getStoreFileScanner(true, true, false, 0, 0, false);
 
     // run the utilities
     DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
index 45444cc..9638e69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
@@ -63,7 +63,7 @@
         path, configuration, cacheConf, BloomType.NONE);
 
     StoreFileReader reader = storeFile.createReader();
-    StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+    StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false);
     Cell current;
 
     scanner.seek(KeyValue.LOWESTKEY);
@@ -95,7 +95,7 @@
     long totalSize = 0;
 
     StoreFileReader reader = storeFile.createReader();
-    StoreFileScanner scanner = reader.getStoreFileScanner(true, false);
+    StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false);
 
     long startReadingTime = System.nanoTime();
     Cell current;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
index d0c0089..174843e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
@@ -203,7 +203,7 @@
     StoreFile sf = new StoreFile(fs, sfPath, conf, cacheConf, bt);
     StoreFileReader r = sf.createReader();
     final boolean pread = true; // does not really matter
-    StoreFileScanner scanner = r.getStoreFileScanner(true, pread);
+    StoreFileScanner scanner = r.getStoreFileScanner(true, pread, false, 0, 0, false);
 
     {
       // Test for false negatives (not allowed).
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
deleted file mode 100644
index db01ee0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.INCLUDE;
-import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.SKIP;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeepDeletedCells;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestQueryMatcher {
-  private static final boolean PRINT = false;
-  private Configuration conf;
-
-  private byte[] row1;
-  private byte[] row2;
-  private byte[] row3;
-  private byte[] fam1;
-  private byte[] fam2;
-  private byte[] col1;
-  private byte[] col2;
-  private byte[] col3;
-  private byte[] col4;
-  private byte[] col5;
-
-  private byte[] data;
-
-  private Get get;
-
-  long ttl = Long.MAX_VALUE;
-  CellComparator rowComparator;
-  private Scan scan;
-
-  @Before
-  public void setUp() throws Exception {
-    this.conf = HBaseConfiguration.create();
-    row1 = Bytes.toBytes("row1");
-    row2 = Bytes.toBytes("row2");
-    row3 = Bytes.toBytes("row3");
-    fam1 = Bytes.toBytes("fam1");
-    fam2 = Bytes.toBytes("fam2");
-    col1 = Bytes.toBytes("col1");
-    col2 = Bytes.toBytes("col2");
-    col3 = Bytes.toBytes("col3");
-    col4 = Bytes.toBytes("col4");
-    col5 = Bytes.toBytes("col5");
-
-    data = Bytes.toBytes("data");
-
-    //Create Get
-    get = new Get(row1);
-    get.addFamily(fam1);
-    get.addColumn(fam2, col2);
-    get.addColumn(fam2, col4);
-    get.addColumn(fam2, col5);
-    this.scan = new Scan(get);
-
-    rowComparator = CellComparator.COMPARATOR;
-
-  }
-
-  private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
-    long now = EnvironmentEdgeManager.currentTime();
-    // 2,4,5
-    ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
-        0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
-        now - ttl, now);
-
-    List<KeyValue> memstore = new ArrayList<KeyValue>();
-    memstore.add(new KeyValue(row1, fam2, col1, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col2, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col3, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col4, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col5, 1, data));
-
-    memstore.add(new KeyValue(row2, fam1, col1, data));
-
-    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
-    KeyValue k = memstore.get(0);
-    qm.setToNewRow(k);
-
-    for (KeyValue kv : memstore){
-      actual.add(qm.match(kv));
-    }
-
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i< expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected "+expected.get(i)+
-            ", actual " +actual.get(i));
-      }
-    }
-  }
-
-  /**
-   * This is a cryptic test. It is checking that we don't include a fake cell, one that has a
-   * timestamp of {@link HConstants#OLDEST_TIMESTAMP}. See HBASE-16074 for background.
-   * @throws IOException
-   */
-  @Test
-  public void testNeverIncludeFakeCell() throws IOException {
-    long now = EnvironmentEdgeManager.currentTime();
-    // Do with fam2 which has a col2 qualifier.
-    ScanQueryMatcher qm = new ScanQueryMatcher(scan,
-        new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
-        get.getFamilyMap().get(fam2), now - ttl, now);
-    Cell kv = new KeyValue(row1, fam2, col2, 1, data);
-    Cell cell = CellUtil.createLastOnRowCol(kv);
-    qm.setToNewRow(kv);
-    MatchCode code = qm.match(cell);
-    assertFalse(code.compareTo(MatchCode.SEEK_NEXT_COL) != 0);
-    }
-
-  @Test
-  public void testMatch_ExplicitColumns()
-  throws IOException {
-    //Moving up from the Tracker by using Gets and List<KeyValue> instead
-    //of just byte []
-
-    //Expected result
-    List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
-    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
-    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
-    expected.add(ScanQueryMatcher.MatchCode.DONE);
-
-    _testMatch_ExplicitColumns(scan, expected);
-  }
-
-  @Test
-  public void testMatch_Wildcard()
-  throws IOException {
-    //Moving up from the Tracker by using Gets and List<KeyValue> instead
-    //of just byte []
-
-    //Expected result
-    List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
-    expected.add(ScanQueryMatcher.MatchCode.DONE);
-
-    long now = EnvironmentEdgeManager.currentTime();
-    ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
-        0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        now - ttl, now);
-
-    List<KeyValue> memstore = new ArrayList<KeyValue>();
-    memstore.add(new KeyValue(row1, fam2, col1, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col2, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col3, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col4, 1, data));
-    memstore.add(new KeyValue(row1, fam2, col5, 1, data));
-    memstore.add(new KeyValue(row2, fam1, col1, 1, data));
-
-    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
-
-    KeyValue k = memstore.get(0);
-    qm.setToNewRow(k);
-
-    for(KeyValue kv : memstore) {
-      actual.add(qm.match(kv));
-    }
-
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i< expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected "+expected.get(i)+
-            ", actual " +actual.get(i));
-      }
-    }
-  }
-
-
-  /**
-   * Verify that {@link ScanQueryMatcher} only skips expired KeyValue
-   * instances and does not exit early from the row (skipping
-   * later non-expired KeyValues).  This version mimics a Get with
-   * explicitly specified column qualifiers.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testMatch_ExpiredExplicit()
-  throws IOException {
-
-    long testTTL = 1000;
-    MatchCode [] expected = new MatchCode[] {
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW,
-        ScanQueryMatcher.MatchCode.DONE
-    };
-
-    long now = EnvironmentEdgeManager.currentTime();
-    ScanQueryMatcher qm =
-      new ScanQueryMatcher(scan,
-        new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
-          rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
-
-    KeyValue [] kvs = new KeyValue[] {
-        new KeyValue(row1, fam2, col1, now-100, data),
-        new KeyValue(row1, fam2, col2, now-50, data),
-        new KeyValue(row1, fam2, col3, now-5000, data),
-        new KeyValue(row1, fam2, col4, now-500, data),
-        new KeyValue(row1, fam2, col5, now-10000, data),
-        new KeyValue(row2, fam1, col1, now-10, data)
-    };
-
-    KeyValue k = kvs[0];
-    qm.setToNewRow(k);
-
-    List<MatchCode> actual = new ArrayList<MatchCode>(kvs.length);
-    for (KeyValue kv : kvs) {
-      actual.add( qm.match(kv) );
-    }
-
-    assertEquals(expected.length, actual.size());
-    for (int i=0; i<expected.length; i++) {
-      if(PRINT){
-        System.out.println("expected "+expected[i]+
-            ", actual " +actual.get(i));
-      }
-      assertEquals(expected[i], actual.get(i));
-    }
-  }
-
-  /**
-   * Verify that {@link ScanQueryMatcher} only skips expired KeyValue
-   * instances and does not exit early from the row (skipping
-   * later non-expired KeyValues).  This version mimics a Get with
-   * wildcard-inferred column qualifiers.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testMatch_ExpiredWildcard()
-  throws IOException {
-
-    long testTTL = 1000;
-    MatchCode [] expected = new MatchCode[] {
-        ScanQueryMatcher.MatchCode.INCLUDE,
-        ScanQueryMatcher.MatchCode.INCLUDE,
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.INCLUDE,
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.DONE
-    };
-
-    long now = EnvironmentEdgeManager.currentTime();
-    ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(this.conf, fam2,
-        0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        now - testTTL, now);
-
-    KeyValue [] kvs = new KeyValue[] {
-        new KeyValue(row1, fam2, col1, now-100, data),
-        new KeyValue(row1, fam2, col2, now-50, data),
-        new KeyValue(row1, fam2, col3, now-5000, data),
-        new KeyValue(row1, fam2, col4, now-500, data),
-        new KeyValue(row1, fam2, col5, now-10000, data),
-        new KeyValue(row2, fam1, col1, now-10, data)
-    };
-    KeyValue k = kvs[0];
-    qm.setToNewRow(k);
-
-    List<ScanQueryMatcher.MatchCode> actual =
-        new ArrayList<ScanQueryMatcher.MatchCode>(kvs.length);
-    for (KeyValue kv : kvs) {
-      actual.add( qm.match(kv) );
-    }
-
-    assertEquals(expected.length, actual.size());
-    for (int i=0; i<expected.length; i++) {
-      if(PRINT){
-        System.out.println("expected "+expected[i]+
-            ", actual " +actual.get(i));
-      }
-      assertEquals(expected[i], actual.get(i));
-    }
-  }
-
-  @Test
-  public void testMatch_PartialRangeDropDeletes() throws Exception {
-    // Some ranges.
-    testDropDeletes(
-        row2, row3, new byte[][] { row1, row2, row2, row3 }, INCLUDE, SKIP, SKIP, INCLUDE);
-    testDropDeletes(row2, row3, new byte[][] { row1, row1, row2 }, INCLUDE, INCLUDE, SKIP);
-    testDropDeletes(row2, row3, new byte[][] { row2, row3, row3 }, SKIP, INCLUDE, INCLUDE);
-    testDropDeletes(row1, row3, new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
-    // Open ranges.
-    testDropDeletes(HConstants.EMPTY_START_ROW, row3,
-        new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
-    testDropDeletes(row2, HConstants.EMPTY_END_ROW,
-        new byte[][] { row1, row2, row3 }, INCLUDE, SKIP, SKIP);
-    testDropDeletes(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
-        new byte[][] { row1, row2, row3, row3 }, SKIP, SKIP, SKIP, SKIP);
-
-    // No KVs in range.
-    testDropDeletes(row2, row3, new byte[][] { row1, row1, row3 }, INCLUDE, INCLUDE, INCLUDE);
-    testDropDeletes(row2, row3, new byte[][] { row3, row3 }, INCLUDE, INCLUDE);
-    testDropDeletes(row2, row3, new byte[][] { row1, row1 }, INCLUDE, INCLUDE);
-  }
-
-  private void testDropDeletes(
-      byte[] from, byte[] to, byte[][] rows, MatchCode... expected) throws IOException {
-    long now = EnvironmentEdgeManager.currentTime();
-    // Set time to purge deletes to negative value to avoid it ever happening.
-    ScanInfo scanInfo =
-      new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, rowComparator);
-    NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
-
-    ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
-        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
-    List<ScanQueryMatcher.MatchCode> actual =
-        new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
-    byte[] prevRow = null;
-    for (byte[] row : rows) {
-      if (prevRow == null || !Bytes.equals(prevRow, row)) {
-        qm.setToNewRow(KeyValueUtil.createFirstOnRow(row));
-        prevRow = row;
-      }
-      actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));
-    }
-
-    assertEquals(expected.length, actual.size());
-    for (int i = 0; i < expected.length; i++) {
-      if (PRINT) System.out.println("expected " + expected[i] + ", actual " + actual.get(i));
-      assertEquals(expected[i], actual.get(i));
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanDeleteTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanDeleteTracker.java
deleted file mode 100644
index 2854832..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanDeleteTracker.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.HBaseTestCase;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestScanDeleteTracker extends HBaseTestCase {
-
-  private ScanDeleteTracker sdt;
-  private long timestamp = 10L;
-  private byte deleteType = 0;
-
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-    sdt = new ScanDeleteTracker();
-  }
-
-  @Test
-  public void testDeletedBy_Delete() {
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        Bytes.toBytes("qualifier"), timestamp, KeyValue.Type.Delete);
-    sdt.add(kv);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.VERSION_DELETED, ret);
-  }
-
-  @Test
-  public void testDeletedBy_DeleteColumn() {
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        Bytes.toBytes("qualifier"), timestamp, KeyValue.Type.DeleteColumn);
-    sdt.add(kv);
-    timestamp -= 5;
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        Bytes.toBytes("qualifier"), timestamp , KeyValue.Type.DeleteColumn);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.COLUMN_DELETED, ret);
-  }
-
-  @Test
-  public void testDeletedBy_DeleteFamily() {
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        Bytes.toBytes("qualifier"), timestamp, KeyValue.Type.DeleteFamily);
-    sdt.add(kv);
-    timestamp -= 5;
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        Bytes.toBytes("qualifier"), timestamp , KeyValue.Type.DeleteColumn);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.FAMILY_DELETED, ret);
-  }
-
-  @Test
-  public void testDeletedBy_DeleteFamilyVersion() {
-    byte [] qualifier1 = Bytes.toBytes("qualifier1");
-    byte [] qualifier2 = Bytes.toBytes("qualifier2");
-    byte [] qualifier3 = Bytes.toBytes("qualifier3");
-    byte [] qualifier4 = Bytes.toBytes("qualifier4");
-    deleteType = KeyValue.Type.DeleteFamilyVersion.getCode();
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        null, timestamp, KeyValue.Type.DeleteFamilyVersion);
-    sdt.add(kv);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier1, timestamp, KeyValue.Type.DeleteFamilyVersion);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier2, timestamp, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier3, timestamp, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier4, timestamp, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier1, timestamp + 3, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.NOT_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier2, timestamp - 2, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.NOT_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier3, timestamp - 5, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.NOT_DELETED, ret);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier4, timestamp + 8, KeyValue.Type.DeleteFamilyVersion);
-    ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.NOT_DELETED, ret);
-  }
-
-
-  @Test
-  public void testDelete_DeleteColumn() {
-    byte [] qualifier = Bytes.toBytes("qualifier");
-    deleteType = KeyValue.Type.Delete.getCode();
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.Delete);
-    sdt.add(kv);
-
-    timestamp -= 5;
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.DeleteColumn);
-    deleteType = KeyValue.Type.DeleteColumn.getCode();
-    sdt.add(kv);
-
-    timestamp -= 5;
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.DeleteColumn);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.COLUMN_DELETED, ret);
-  }
-
-
-  @Test
-  public void testDeleteColumn_Delete() {
-    byte [] qualifier = Bytes.toBytes("qualifier");
-    deleteType = KeyValue.Type.DeleteColumn.getCode();
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.DeleteColumn);
-    sdt.add(kv);
-
-    qualifier = Bytes.toBytes("qualifier1");
-    deleteType = KeyValue.Type.Delete.getCode();
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.Delete);
-    sdt.add(kv);
-
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals( DeleteResult.VERSION_DELETED, ret);
-  }
-
-  //Testing new way where we save the Delete in case of a Delete for specific
-  //ts, could have just added the last line to the first test, but rather keep
-  //them separated
-  @Test
-  public void testDelete_KeepDelete(){
-    byte [] qualifier = Bytes.toBytes("qualifier");
-    deleteType = KeyValue.Type.Delete.getCode();
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, timestamp, KeyValue.Type.Delete);
-    sdt.add(kv);
-    sdt.isDeleted(kv);
-    assertEquals(false ,sdt.isEmpty());
-  }
-
-  @Test
-  public void testDelete_KeepVersionZero(){
-    byte [] qualifier = Bytes.toBytes("qualifier");
-    deleteType = KeyValue.Type.Delete.getCode();
-
-    long deleteTimestamp = 10;
-    long valueTimestamp = 0;
-
-    sdt.reset();
-    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, deleteTimestamp, KeyValue.Type.Delete);
-    sdt.add(kv);
-    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"),
-        qualifier, valueTimestamp, KeyValue.Type.Delete);
-    DeleteResult ret = sdt.isDeleted(kv);
-    assertEquals(DeleteResult.NOT_DELETED, ret);
-  }
-
-
-}
-
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
index ab0c173..86604a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
@@ -211,7 +211,7 @@
     when(hcd.getName()).thenReturn(cf);
     when(store.getFamily()).thenReturn(hcd);
     StoreFileScanner scanner =
-        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0);
+        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0, true);
     Scan scan = new Scan();
     scan.setColumnFamilyTimeRange(cf, 0, 1);
     assertFalse(scanner.shouldUseScanner(scan, store, 0));
@@ -483,6 +483,11 @@
     }
   }
 
+  private static StoreFileScanner getStoreFileScanner(StoreFileReader reader, boolean cacheBlocks,
+      boolean pread) {
+    return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false);
+  }
+
   private static final String localFormatter = "%010d";
 
   private void bloomWriteRead(StoreFileWriter writer, FileSystem fs) throws Exception {
@@ -500,7 +505,7 @@
     StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
     reader.loadFileInfo();
     reader.loadBloomfilter();
-    StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
 
     // check false positives rate
     int falsePos = 0;
@@ -636,7 +641,7 @@
     // Now do reseek with empty KV to position to the beginning of the file
 
     KeyValue k = KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY);
-    StoreFileScanner s = reader.getStoreFileScanner(false, false);
+    StoreFileScanner s = getStoreFileScanner(reader, false, false);
     s.reseek(k);
 
     assertNotNull("Intial reseek should position at the beginning of the file", s.peek());
@@ -694,7 +699,7 @@
       StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
       reader.loadFileInfo();
       reader.loadBloomfilter();
-      StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+      StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
       assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount());
 
       Store store = mock(Store.class);
@@ -846,7 +851,7 @@
     when(hcd.getName()).thenReturn(family);
     when(store.getFamily()).thenReturn(hcd);
     StoreFileReader reader = hsf.createReader();
-    StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+    StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
     columns.add(qualifier);
 
@@ -903,7 +908,7 @@
     // Read this file, we should see 3 misses
     StoreFileReader reader = hsf.createReader();
     reader.loadFileInfo();
-    StoreFileScanner scanner = reader.getStoreFileScanner(true, true);
+    StoreFileScanner scanner = getStoreFileScanner(reader, true, true);
     scanner.seek(KeyValue.LOWESTKEY);
     while (scanner.next() != null);
     assertEquals(startHit, cs.getHitCount());
@@ -923,7 +928,7 @@
 
     // Read this file, we should see 3 hits
     reader = hsf.createReader();
-    scanner = reader.getStoreFileScanner(true, true);
+    scanner = getStoreFileScanner(reader, true, true);
     scanner.seek(KeyValue.LOWESTKEY);
     while (scanner.next() != null);
     assertEquals(startHit + 3, cs.getHitCount());
@@ -938,13 +943,13 @@
       BloomType.NONE);
     StoreFileReader readerOne = hsf.createReader();
     readerOne.loadFileInfo();
-    StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
+    StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true);
     scannerOne.seek(KeyValue.LOWESTKEY);
     hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
       BloomType.NONE);
     StoreFileReader readerTwo = hsf.createReader();
     readerTwo.loadFileInfo();
-    StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
+    StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true);
     scannerTwo.seek(KeyValue.LOWESTKEY);
     Cell kv1 = null;
     Cell kv2 = null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
index 577940b..d628dc8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java
@@ -75,7 +75,7 @@
     writer.close();
 
     StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf);
-    StoreFileScanner s = reader.getStoreFileScanner(false, false);
+    StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false);
     try {
       // Now do reseek with empty KV to position to the beginning of the file
       KeyValue k = KeyValueUtil.createFirstOnRow(Bytes.toBytes("k2"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 4c594b0..52efe63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -74,7 +75,7 @@
    * From here on down, we have a bunch of defines and specific CELL_GRID of Cells. The
    * CELL_GRID then has a Scanner that can fake out 'block' transitions. All this elaborate
    * setup is for tests that ensure we don't overread, and that the
-   * {@link StoreScanner#optimize(org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode,
+   * {@link StoreScanner#optimize(org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode,
    * Cell)} is not overly enthusiastic.
    */
   private static final byte [] ZERO = new byte [] {'0'};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 670a8d3..89f61d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -64,8 +64,8 @@
     when(r.length()).thenReturn(1L);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
-        .thenReturn(mock(StoreFileScanner.class));
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
+      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index d8770e0..fa6e62a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -750,9 +750,8 @@
     when(r.length()).thenReturn(size);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
-        .thenReturn(
-      mock(StoreFileScanner.class));
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(),
+      anyBoolean())).thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/AbstractTestScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/AbstractTestScanQueryMatcher.java
new file mode 100644
index 0000000..df33d824
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/AbstractTestScanQueryMatcher.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+
+public class AbstractTestScanQueryMatcher {
+
+  protected Configuration conf;
+
+  protected byte[] row1;
+  protected byte[] row2;
+  protected byte[] row3;
+  protected byte[] fam1;
+  protected byte[] fam2;
+  protected byte[] col1;
+  protected byte[] col2;
+  protected byte[] col3;
+  protected byte[] col4;
+  protected byte[] col5;
+
+  protected byte[] data;
+
+  protected Get get;
+
+  protected long ttl = Long.MAX_VALUE;
+  protected CellComparator rowComparator;
+  protected Scan scan;
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = HBaseConfiguration.create();
+    row1 = Bytes.toBytes("row1");
+    row2 = Bytes.toBytes("row2");
+    row3 = Bytes.toBytes("row3");
+    fam1 = Bytes.toBytes("fam1");
+    fam2 = Bytes.toBytes("fam2");
+    col1 = Bytes.toBytes("col1");
+    col2 = Bytes.toBytes("col2");
+    col3 = Bytes.toBytes("col3");
+    col4 = Bytes.toBytes("col4");
+    col5 = Bytes.toBytes("col5");
+
+    data = Bytes.toBytes("data");
+
+    // Create Get
+    get = new Get(row1);
+    get.addFamily(fam1);
+    get.addColumn(fam2, col2);
+    get.addColumn(fam2, col4);
+    get.addColumn(fam2, col5);
+    this.scan = new Scan(get);
+
+    rowComparator = CellComparator.COMPARATOR;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
new file mode 100644
index 0000000..055fe1c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import static org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode.INCLUDE;
+import static org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode.SKIP;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher {
+
+  private static final Log LOG = LogFactory.getLog(TestCompactionScanQueryMatcher.class);
+
+  @Test
+  public void testMatch_PartialRangeDropDeletes() throws Exception {
+    // Some ranges.
+    testDropDeletes(row2, row3, new byte[][] { row1, row2, row2, row3 }, INCLUDE, SKIP, SKIP,
+      INCLUDE);
+    testDropDeletes(row2, row3, new byte[][] { row1, row1, row2 }, INCLUDE, INCLUDE, SKIP);
+    testDropDeletes(row2, row3, new byte[][] { row2, row3, row3 }, SKIP, INCLUDE, INCLUDE);
+    testDropDeletes(row1, row3, new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
+    // Open ranges.
+    testDropDeletes(HConstants.EMPTY_START_ROW, row3, new byte[][] { row1, row2, row3 }, SKIP, SKIP,
+      INCLUDE);
+    testDropDeletes(row2, HConstants.EMPTY_END_ROW, new byte[][] { row1, row2, row3 }, INCLUDE,
+      SKIP, SKIP);
+    testDropDeletes(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+      new byte[][] { row1, row2, row3, row3 }, SKIP, SKIP, SKIP, SKIP);
+
+    // No KVs in range.
+    testDropDeletes(row2, row3, new byte[][] { row1, row1, row3 }, INCLUDE, INCLUDE, INCLUDE);
+    testDropDeletes(row2, row3, new byte[][] { row3, row3 }, INCLUDE, INCLUDE);
+    testDropDeletes(row2, row3, new byte[][] { row1, row1 }, INCLUDE, INCLUDE);
+  }
+
+  private void testDropDeletes(byte[] from, byte[] to, byte[][] rows, MatchCode... expected)
+      throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    // Set time to purge deletes to negative value to avoid it ever happening.
+    ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L,
+        rowComparator);
+
+    CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo,
+      ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP,
+      HConstants.OLDEST_TIMESTAMP, now, from, to, null);
+    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>(
+        rows.length);
+    byte[] prevRow = null;
+    for (byte[] row : rows) {
+      if (prevRow == null || !Bytes.equals(prevRow, row)) {
+        qm.setToNewRow(KeyValueUtil.createFirstOnRow(row));
+        prevRow = row;
+      }
+      actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));
+    }
+
+    assertEquals(expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      LOG.debug("expected " + expected[i] + ", actual " + actual.get(i));
+      assertEquals(expected[i], actual.get(i));
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java
similarity index 70%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java
index 020781c..a7e0667 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestExplicitColumnTracker.java
@@ -17,26 +17,25 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
 
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.TreeSet;
-import java.util.Arrays;
 
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-
-@Category({RegionServerTests.class, SmallTests.class})
+@Category({ RegionServerTests.class, SmallTests.class })
 public class TestExplicitColumnTracker {
 
   private final byte[] col1 = Bytes.toBytes("col1");
@@ -45,46 +44,42 @@
   private final byte[] col4 = Bytes.toBytes("col4");
   private final byte[] col5 = Bytes.toBytes("col5");
 
-  private void runTest(int maxVersions,
-                       TreeSet<byte[]> trackColumns,
-                       List<byte[]> scannerColumns,
-                       List<MatchCode> expected) throws IOException {
-    ColumnTracker exp = new ExplicitColumnTracker(
-      trackColumns, 0, maxVersions, Long.MIN_VALUE);
+  private void runTest(int maxVersions, TreeSet<byte[]> trackColumns, List<byte[]> scannerColumns,
+      List<MatchCode> expected) throws IOException {
+    ColumnTracker exp = new ExplicitColumnTracker(trackColumns, 0, maxVersions, Long.MIN_VALUE);
 
-
-    //Initialize result
+    // Initialize result
     List<ScanQueryMatcher.MatchCode> result = new ArrayList<ScanQueryMatcher.MatchCode>();
 
     long timestamp = 0;
-    //"Match"
-    for(byte [] col : scannerColumns){
+    // "Match"
+    for (byte[] col : scannerColumns) {
       result.add(ScanQueryMatcher.checkColumn(exp, col, 0, col.length, ++timestamp,
         KeyValue.Type.Put.getCode(), false));
     }
 
     assertEquals(expected.size(), result.size());
-    for(int i=0; i< expected.size(); i++){
+    for (int i = 0; i < expected.size(); i++) {
       assertEquals(expected.get(i), result.get(i));
     }
   }
 
   @Test
-  public void testGet_SingleVersion() throws IOException{
-    //Create tracker
+  public void testGetSingleVersion() throws IOException {
+    // Create tracker
     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    //Looking for every other
+    // Looking for every other
     columns.add(col2);
     columns.add(col4);
     List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
-    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);             // col1
+    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col1
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2
-    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);             // col3
+    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); // col3
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4
-    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);             // col5
+    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); // col5
     int maxVersions = 1;
 
-    //Create "Scanner"
+    // Create "Scanner"
     List<byte[]> scanner = new ArrayList<byte[]>();
     scanner.add(col1);
     scanner.add(col2);
@@ -96,10 +91,10 @@
   }
 
   @Test
-  public void testGet_MultiVersion() throws IOException{
-    //Create tracker
+  public void testGetMultiVersion() throws IOException {
+    // Create tracker
     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    //Looking for every other
+    // Looking for every other
     columns.add(col2);
     columns.add(col4);
 
@@ -108,7 +103,7 @@
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
 
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);                   // col2; 1st version
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col2; 1st version
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL); // col2; 2nd version
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
 
@@ -116,7 +111,7 @@
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
 
-    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);                   // col4; 1st version
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE); // col4; 1st version
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW); // col4; 2nd version
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
 
@@ -125,7 +120,7 @@
     expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW);
     int maxVersions = 2;
 
-    //Create "Scanner"
+    // Create "Scanner"
     List<byte[]> scanner = new ArrayList<byte[]>();
     scanner.add(col1);
     scanner.add(col1);
@@ -143,7 +138,7 @@
     scanner.add(col5);
     scanner.add(col5);
 
-    //Initialize result
+    // Initialize result
     runTest(maxVersions, columns, scanner, expected);
   }
 
@@ -151,24 +146,23 @@
    * hbase-2259
    */
   @Test
-  public void testStackOverflow() throws IOException{
+  public void testStackOverflow() throws IOException {
     int maxVersions = 1;
     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
     for (int i = 0; i < 100000; i++) {
-      columns.add(Bytes.toBytes("col"+i));
+      columns.add(Bytes.toBytes("col" + i));
     }
 
-    ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
-        Long.MIN_VALUE);
-    for (int i = 0; i < 100000; i+=2) {
-      byte [] col = Bytes.toBytes("col"+i);
+    ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions, Long.MIN_VALUE);
+    for (int i = 0; i < 100000; i += 2) {
+      byte[] col = Bytes.toBytes("col" + i);
       ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
         false);
     }
     explicit.reset();
 
-    for (int i = 1; i < 100000; i+=2) {
-      byte [] col = Bytes.toBytes("col"+i);
+    for (int i = 1; i < 100000; i += 2) {
+      byte[] col = Bytes.toBytes("col" + i);
       ScanQueryMatcher.checkColumn(explicit, col, 0, col.length, 1, KeyValue.Type.Put.getCode(),
         false);
     }
@@ -180,16 +174,12 @@
   @Test
   public void testInfiniteLoop() throws IOException {
     TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    columns.addAll(Arrays.asList(new byte[][] {
-      col2, col3, col5 }));
-    List<byte[]> scanner = Arrays.<byte[]>asList(
-      new byte[][] { col1, col4 });
-    List<ScanQueryMatcher.MatchCode> expected = Arrays.<ScanQueryMatcher.MatchCode>asList(
-      new ScanQueryMatcher.MatchCode[] {
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
-        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL });
+    columns.addAll(Arrays.asList(new byte[][] { col2, col3, col5 }));
+    List<byte[]> scanner = Arrays.<byte[]> asList(new byte[][] { col1, col4 });
+    List<ScanQueryMatcher.MatchCode> expected = Arrays.<ScanQueryMatcher.MatchCode> asList(
+      new ScanQueryMatcher.MatchCode[] { ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
+          ScanQueryMatcher.MatchCode.SEEK_NEXT_COL });
     runTest(1, columns, scanner, expected);
   }
 
 }
-
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanDeleteTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanDeleteTracker.java
new file mode 100644
index 0000000..fce35bd
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanDeleteTracker.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestScanDeleteTracker {
+
+  private ScanDeleteTracker sdt;
+
+  private long timestamp = 10L;
+
+  @Before
+  public void setUp() throws Exception {
+    sdt = new ScanDeleteTracker();
+  }
+
+  @Test
+  public void testDeletedByDelete() {
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), Bytes.toBytes("qualifier"),
+        timestamp, KeyValue.Type.Delete);
+    sdt.add(kv);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.VERSION_DELETED, ret);
+  }
+
+  @Test
+  public void testDeletedByDeleteColumn() {
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), Bytes.toBytes("qualifier"),
+        timestamp, KeyValue.Type.DeleteColumn);
+    sdt.add(kv);
+    timestamp -= 5;
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), Bytes.toBytes("qualifier"),
+        timestamp, KeyValue.Type.DeleteColumn);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.COLUMN_DELETED, ret);
+  }
+
+  @Test
+  public void testDeletedByDeleteFamily() {
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), Bytes.toBytes("qualifier"),
+        timestamp, KeyValue.Type.DeleteFamily);
+    sdt.add(kv);
+    timestamp -= 5;
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), Bytes.toBytes("qualifier"),
+        timestamp, KeyValue.Type.DeleteColumn);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.FAMILY_DELETED, ret);
+  }
+
+  @Test
+  public void testDeletedByDeleteFamilyVersion() {
+    byte[] qualifier1 = Bytes.toBytes("qualifier1");
+    byte[] qualifier2 = Bytes.toBytes("qualifier2");
+    byte[] qualifier3 = Bytes.toBytes("qualifier3");
+    byte[] qualifier4 = Bytes.toBytes("qualifier4");
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), null, timestamp,
+        KeyValue.Type.DeleteFamilyVersion);
+    sdt.add(kv);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier1, timestamp,
+        KeyValue.Type.DeleteFamilyVersion);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier2, timestamp,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier3, timestamp,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier4, timestamp,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.FAMILY_VERSION_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier1, timestamp + 3,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.NOT_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier2, timestamp - 2,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.NOT_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier3, timestamp - 5,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.NOT_DELETED, ret);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier4, timestamp + 8,
+        KeyValue.Type.DeleteFamilyVersion);
+    ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.NOT_DELETED, ret);
+  }
+
+  @Test
+  public void testDeleteDeleteColumn() {
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.Delete);
+    sdt.add(kv);
+
+    timestamp -= 5;
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.DeleteColumn);
+    sdt.add(kv);
+
+    timestamp -= 5;
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.DeleteColumn);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.COLUMN_DELETED, ret);
+  }
+
+  @Test
+  public void testDeleteColumnDelete() {
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.DeleteColumn);
+    sdt.add(kv);
+
+    qualifier = Bytes.toBytes("qualifier1");
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.Delete);
+    sdt.add(kv);
+
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.VERSION_DELETED, ret);
+  }
+
+  // Testing new way where we save the Delete in case of a Delete for specific
+  // ts, could have just added the last line to the first test, but rather keep
+  // them separated
+  @Test
+  public void testDeleteKeepDelete() {
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, timestamp,
+        KeyValue.Type.Delete);
+    sdt.add(kv);
+    sdt.isDeleted(kv);
+    assertEquals(false, sdt.isEmpty());
+  }
+
+  @Test
+  public void testDeleteKeepVersionZero() {
+    byte[] qualifier = Bytes.toBytes("qualifier");
+
+    long deleteTimestamp = 10;
+    long valueTimestamp = 0;
+
+    sdt.reset();
+    KeyValue kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, deleteTimestamp,
+        KeyValue.Type.Delete);
+    sdt.add(kv);
+    kv = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("f"), qualifier, valueTimestamp,
+        KeyValue.Type.Delete);
+    DeleteResult ret = sdt.isDeleted(kv);
+    assertEquals(DeleteResult.NOT_DELETED, ret);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanWildcardColumnTracker.java
similarity index 62%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanWildcardColumnTracker.java
index c0dcee6..044988b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestScanWildcardColumnTracker.java
@@ -17,36 +17,40 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestScanWildcardColumnTracker extends HBaseTestCase {
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestScanWildcardColumnTracker {
 
   final static int VERSIONS = 2;
 
-  public void testCheckColumn_Ok() throws IOException {
-    ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+  @Test
+  public void testCheckColumnOk() throws IOException {
+    ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
 
-    //Create list of qualifiers
+    // Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();
     qualifiers.add(Bytes.toBytes("qualifier1"));
     qualifiers.add(Bytes.toBytes("qualifier2"));
     qualifiers.add(Bytes.toBytes("qualifier3"));
     qualifiers.add(Bytes.toBytes("qualifier4"));
 
-    //Setting up expected result
+    // Setting up expected result
     List<MatchCode> expected = new ArrayList<MatchCode>();
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
@@ -55,31 +59,30 @@
 
     List<ScanQueryMatcher.MatchCode> actual = new ArrayList<MatchCode>();
 
-    for(byte [] qualifier : qualifiers) {
-      ScanQueryMatcher.MatchCode mc =
-          ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, 1,
-            KeyValue.Type.Put.getCode(), false);
+    for (byte[] qualifier : qualifiers) {
+      ScanQueryMatcher.MatchCode mc = ScanQueryMatcher.checkColumn(tracker, qualifier, 0,
+        qualifier.length, 1, KeyValue.Type.Put.getCode(), false);
       actual.add(mc);
     }
 
-    //Compare actual with expected
-    for(int i=0; i<expected.size(); i++) {
+    // Compare actual with expected
+    for (int i = 0; i < expected.size(); i++) {
       assertEquals(expected.get(i), actual.get(i));
     }
   }
 
-  public void testCheckColumn_EnforceVersions() throws IOException {
-    ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+  @Test
+  public void testCheckColumnEnforceVersions() throws IOException {
+    ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
 
-    //Create list of qualifiers
+    // Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();
     qualifiers.add(Bytes.toBytes("qualifier1"));
     qualifiers.add(Bytes.toBytes("qualifier1"));
     qualifiers.add(Bytes.toBytes("qualifier1"));
     qualifiers.add(Bytes.toBytes("qualifier2"));
 
-    //Setting up expected result
+    // Setting up expected result
     List<ScanQueryMatcher.MatchCode> expected = new ArrayList<MatchCode>();
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
@@ -89,42 +92,35 @@
     List<MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
 
     long timestamp = 0;
-    for(byte [] qualifier : qualifiers) {
-      MatchCode mc =
-          ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, ++timestamp,
-            KeyValue.Type.Put.getCode(), false);
+    for (byte[] qualifier : qualifiers) {
+      MatchCode mc = ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length,
+        ++timestamp, KeyValue.Type.Put.getCode(), false);
       actual.add(mc);
     }
 
-    //Compare actual with expected
-    for(int i=0; i<expected.size(); i++) {
+    // Compare actual with expected
+    for (int i = 0; i < expected.size(); i++) {
       assertEquals(expected.get(i), actual.get(i));
     }
   }
 
-  public void DisabledTestCheckColumn_WrongOrder() {
-    ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
+  @Test
+  public void DisabledTestCheckColumnWrongOrder() {
+    ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
 
-    //Create list of qualifiers
+    // Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();
     qualifiers.add(Bytes.toBytes("qualifier2"));
     qualifiers.add(Bytes.toBytes("qualifier1"));
 
-    boolean ok = false;
-
     try {
-      for(byte [] qualifier : qualifiers) {
+      for (byte[] qualifier : qualifiers) {
         ScanQueryMatcher.checkColumn(tracker, qualifier, 0, qualifier.length, 1,
           KeyValue.Type.Put.getCode(), false);
       }
-    } catch (Exception e) {
-      ok = true;
+      fail();
+    } catch (IOException e) {
+      // expected
     }
-
-    assertEquals(true, ok);
   }
-
-
 }
-
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
new file mode 100644
index 0000000..04c3611
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.querymatcher;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
+
+  private static final Log LOG = LogFactory.getLog(TestUserScanQueryMatcher.class);
+
+  /**
+   * This is a cryptic test. It is checking that we don't include a fake cell, one that has a
+   * timestamp of {@link HConstants#OLDEST_TIMESTAMP}. See HBASE-16074 for background.
+   * @throws IOException
+   */
+  @Test
+  public void testNeverIncludeFakeCell() throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    // Do with fam2 which has a col2 qualifier.
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
+      new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+      get.getFamilyMap().get(fam2), now - ttl, now, null);
+    Cell kv = new KeyValue(row1, fam2, col2, 1, data);
+    Cell cell = CellUtil.createLastOnRowCol(kv);
+    qm.setToNewRow(kv);
+    MatchCode code = qm.match(cell);
+    assertFalse(code.compareTo(MatchCode.SEEK_NEXT_COL) != 0);
+  }
+
+  @Test
+  public void testMatchExplicitColumns() throws IOException {
+    // Moving up from the Tracker by using Gets and List<KeyValue> instead
+    // of just byte []
+
+    // Expected result
+    List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
+    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+    expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
+    expected.add(ScanQueryMatcher.MatchCode.DONE);
+
+    long now = EnvironmentEdgeManager.currentTime();
+    // 2,4,5
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
+      new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
+      get.getFamilyMap().get(fam2), now - ttl, now, null);
+
+    List<KeyValue> memstore = new ArrayList<KeyValue>();
+    memstore.add(new KeyValue(row1, fam2, col1, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col2, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col3, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col4, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col5, 1, data));
+
+    memstore.add(new KeyValue(row2, fam1, col1, data));
+
+    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
+    KeyValue k = memstore.get(0);
+    qm.setToNewRow(k);
+
+    for (KeyValue kv : memstore) {
+      actual.add(qm.match(kv));
+    }
+
+    assertEquals(expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      LOG.debug("expected " + expected.get(i) + ", actual " + actual.get(i));
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+
+  @Test
+  public void testMatch_Wildcard() throws IOException {
+    // Moving up from the Tracker by using Gets and List<KeyValue> instead
+    // of just byte []
+
+    // Expected result
+    List<MatchCode> expected = new ArrayList<ScanQueryMatcher.MatchCode>();
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
+    expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
+    expected.add(ScanQueryMatcher.MatchCode.DONE);
+
+    long now = EnvironmentEdgeManager.currentTime();
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
+      new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
+      now - ttl, now, null);
+
+    List<KeyValue> memstore = new ArrayList<KeyValue>();
+    memstore.add(new KeyValue(row1, fam2, col1, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col2, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col3, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col4, 1, data));
+    memstore.add(new KeyValue(row1, fam2, col5, 1, data));
+    memstore.add(new KeyValue(row2, fam1, col1, 1, data));
+
+    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>();
+
+    KeyValue k = memstore.get(0);
+    qm.setToNewRow(k);
+
+    for (KeyValue kv : memstore) {
+      actual.add(qm.match(kv));
+    }
+
+    assertEquals(expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      LOG.debug("expected " + expected.get(i) + ", actual " + actual.get(i));
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+
+  /**
+   * Verify that {@link ScanQueryMatcher} only skips expired KeyValue instances and does not exit
+   * early from the row (skipping later non-expired KeyValues). This version mimics a Get with
+   * explicitly specified column qualifiers.
+   * @throws IOException
+   */
+  @Test
+  public void testMatch_ExpiredExplicit() throws IOException {
+
+    long testTTL = 1000;
+    MatchCode[] expected = new MatchCode[] { ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW, ScanQueryMatcher.MatchCode.DONE };
+
+    long now = EnvironmentEdgeManager.currentTime();
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
+      new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator),
+      get.getFamilyMap().get(fam2), now - testTTL, now, null);
+
+    KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
+        new KeyValue(row1, fam2, col2, now - 50, data),
+        new KeyValue(row1, fam2, col3, now - 5000, data),
+        new KeyValue(row1, fam2, col4, now - 500, data),
+        new KeyValue(row1, fam2, col5, now - 10000, data),
+        new KeyValue(row2, fam1, col1, now - 10, data) };
+
+    KeyValue k = kvs[0];
+    qm.setToNewRow(k);
+
+    List<MatchCode> actual = new ArrayList<MatchCode>(kvs.length);
+    for (KeyValue kv : kvs) {
+      actual.add(qm.match(kv));
+    }
+
+    assertEquals(expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      LOG.debug("expected " + expected[i] + ", actual " + actual.get(i));
+      assertEquals(expected[i], actual.get(i));
+    }
+  }
+
+  /**
+   * Verify that {@link ScanQueryMatcher} only skips expired KeyValue instances and does not exit
+   * early from the row (skipping later non-expired KeyValues). This version mimics a Get with
+   * wildcard-inferred column qualifiers.
+   * @throws IOException
+   */
+  @Test
+  public void testMatch_ExpiredWildcard() throws IOException {
+
+    long testTTL = 1000;
+    MatchCode[] expected = new MatchCode[] { ScanQueryMatcher.MatchCode.INCLUDE,
+        ScanQueryMatcher.MatchCode.INCLUDE, ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.INCLUDE, ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
+        ScanQueryMatcher.MatchCode.DONE };
+
+    long now = EnvironmentEdgeManager.currentTime();
+    UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
+      new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
+      now - testTTL, now, null);
+
+    KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
+        new KeyValue(row1, fam2, col2, now - 50, data),
+        new KeyValue(row1, fam2, col3, now - 5000, data),
+        new KeyValue(row1, fam2, col4, now - 500, data),
+        new KeyValue(row1, fam2, col5, now - 10000, data),
+        new KeyValue(row2, fam1, col1, now - 10, data) };
+    KeyValue k = kvs[0];
+    qm.setToNewRow(k);
+
+    List<ScanQueryMatcher.MatchCode> actual = new ArrayList<ScanQueryMatcher.MatchCode>(kvs.length);
+    for (KeyValue kv : kvs) {
+      actual.add(qm.match(kv));
+    }
+
+    assertEquals(expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      LOG.debug("expected " + expected[i] + ", actual " + actual.get(i));
+      assertEquals(expected[i], actual.get(i));
+    }
+  }
+}