PHOENIX-3904 Queries with LIKE and LIMIT OFFSET fail for mutable tables with column mapping enabled
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
index 7c04d01..62d79bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultColumnValueIT.java
@@ -37,7 +37,6 @@
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LikeExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LikeExpressionIT.java
index 3dba4d5..03afdfc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LikeExpressionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LikeExpressionIT.java
@@ -295,4 +295,64 @@
         assertTrue(rs.wasNull());
         assertFalse(rs.next());
     }
+
+    @Test
+    public void testLikeExpressionWithLimitOffset() throws Exception {
+        String tableName = generateUniqueName();
+        String ddl =
+                "create table " + tableName
+                        + " (id integer not null primary key, cf.col1 varchar, cf.col2 varchar, cf2.col3 varchar, cf2.col4 varchar)";
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?, ?, ?, ?)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            for (int i = 1; i <= 10; i++) {
+                stmt.setInt(1, i);
+                stmt.setString(2, i + "col1");
+                stmt.setString(3, i + "col2");
+                stmt.setString(4, i + "col3");
+                stmt.setString(5, i + "col4");
+                stmt.executeUpdate();
+            }
+            conn.commit();
+
+            String query =
+                    "select cf.* from " + tableName
+                            + " where cf.col1 like '%col1%' limit 10 offset 2";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            int expectedCount = 8;
+            int i = 0;
+            while (rs.next()) {
+                i++;
+                assertTrue(rs.getString("COL1").contains("col1"));
+                assertTrue(rs.getString("COL2").contains("col2"));
+            }
+            assertEquals(expectedCount, i);
+
+            query =
+                    "select cf.*, cf2.* from " + tableName
+                            + " where cf.col1 like '%col1%' limit 10 offset 2";
+            rs = conn.createStatement().executeQuery(query);
+            i = 0;
+            while (rs.next()) {
+                i++;
+                assertTrue(rs.getString("COL1").contains("col1"));
+                assertTrue(rs.getString("COL2").contains("col2"));
+                assertTrue(rs.getString("COL3").contains("col3"));
+                assertTrue(rs.getString("COL4").contains("col4"));
+            }
+            assertEquals(expectedCount, i);
+            query = "select * from " + tableName + " where cf.col1 like '%col1%' limit 10 offset 2";
+            rs = conn.createStatement().executeQuery(query);
+            i = 0;
+            while (rs.next()) {
+                i++;
+                assertTrue(rs.getString("COL1").contains("col1"));
+                assertTrue(rs.getString("COL2").contains("col2"));
+                assertTrue(rs.getString("COL3").contains("col3"));
+                assertTrue(rs.getString("COL4").contains("col4"));
+            }
+            assertEquals(expectedCount, i);
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index ae07a9a..9754313 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -279,7 +279,7 @@
         return sequences;
     }
 
-    public void addWhereCoditionColumn(byte[] cf, byte[] q) {
+    public void addWhereConditionColumn(byte[] cf, byte[] q) {
         whereConditionColumns.add(new Pair<byte[], byte[]>(cf, q));
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index ed6c6cc..5ccd538 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -181,7 +181,7 @@
                 byte[] cq = tableRef.getTable().getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS 
                 		? QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES : ref.getColumn().getColumnQualifierBytes();
                 // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs
-                context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
+                context.addWhereConditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
             }
 			return newColumnExpression;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index d563bc2..0471433 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.iterate;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
@@ -211,7 +210,7 @@
                             // selected column values are returned back to client.
                             context.getWhereConditionColumns().clear();
                             for (PColumnFamily family : table.getColumnFamilies()) {
-                                context.addWhereCoditionColumn(family.getName().getBytes(), null);
+                                context.addWhereConditionColumn(family.getName().getBytes(), null);
                             }
                         } else {
                             byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
@@ -256,13 +255,8 @@
             scan.setAttribute(BaseScannerRegionObserver.USE_NEW_VALUE_COLUMN_QUALIFIER, Bytes.toBytes(true));
             // When analyzing the table, there is no look up for key values being done.
             // So there is no point setting the range.
-            if (EncodedColumnsUtil.setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) {
-                Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context);
-                if (range != null) {
-                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst()));
-                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond()));
-                    ScanUtil.setQualifierRangesOnFilter(scan, range);
-                }
+            if (!ScanUtil.isAnalyzeTable(scan)) {
+                setQualifierRanges(keyOnlyFilter, table, scan, context);
             }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
@@ -270,61 +264,70 @@
         }
     }
     
-    private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context)
-            throws SQLException {
-        PTable table = context.getCurrentTable().getTable();
-        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
-        checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS,
-            "Method should only be used for tables using encoded column names");
-        Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
-        for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
-            byte[] cq = whereCol.getSecond();
-            if (cq != null) {
-                int qualifier = table.getEncodingScheme().decode(cq);
-                determineQualifierRange(qualifier, minMaxQualifiers);
+    private static void setQualifierRanges(boolean keyOnlyFilter, PTable table, Scan scan,
+            StatementContext context) throws SQLException {
+        if (EncodedColumnsUtil.useEncodedQualifierListOptimization(table)) {
+            Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
+            for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+                byte[] cq = whereCol.getSecond();
+                if (cq != null) {
+                    int qualifier = table.getEncodingScheme().decode(cq);
+                    adjustQualifierRange(qualifier, minMaxQualifiers);
+                }
             }
-        }
-        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-
-        Map<String, Pair<Integer, Integer>> qualifierRanges = EncodedColumnsUtil.getFamilyQualifierRanges(table);
-        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-            if (entry.getValue() != null) {
-                for (byte[] cq : entry.getValue()) {
-                    if (cq != null) {
-                        int qualifier = table.getEncodingScheme().decode(cq);
-                        determineQualifierRange(qualifier, minMaxQualifiers);
+            Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+            for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+                if (entry.getValue() != null) {
+                    for (byte[] cq : entry.getValue()) {
+                        if (cq != null) {
+                            int qualifier = table.getEncodingScheme().decode(cq);
+                            adjustQualifierRange(qualifier, minMaxQualifiers);
+                        }
+                    }
+                } else {
+                    byte[] cf = entry.getKey();
+                    String family = Bytes.toString(cf);
+                    if (table.getType() == INDEX && table.getIndexType() == LOCAL
+                            && !IndexUtil.isLocalIndexFamily(family)) {
+                        // TODO: samarth confirm with James why do we need this hack here :(
+                        family = IndexUtil.getLocalIndexColumnFamily(family);
+                    }
+                    byte[] familyBytes = Bytes.toBytes(family);
+                    NavigableSet<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+                    if (Bytes.equals(familyBytes, SchemaUtil.getEmptyColumnFamily(table))) {
+                        // If the column family is also the empty column family, project the
+                        // empty key value column
+                        Pair<byte[], byte[]> emptyKeyValueInfo =
+                                EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+                        qualifierSet.add(emptyKeyValueInfo.getFirst());
+                    }
+                    // In case of a keyOnlyFilter, we only need to project the 
+                    // empty key value column
+                    if (!keyOnlyFilter) {
+                        Pair<Integer, Integer> qualifierRangeForFamily =
+                                EncodedColumnsUtil.setQualifiersForColumnsInFamily(table, family,
+                                    qualifierSet);
+                        familyMap.put(familyBytes, qualifierSet);
+                        if (qualifierRangeForFamily != null) {
+                            adjustQualifierRange(qualifierRangeForFamily.getFirst(),
+                                minMaxQualifiers);
+                            adjustQualifierRange(qualifierRangeForFamily.getSecond(),
+                                minMaxQualifiers);
+                        }
                     }
                 }
-            } else {
-                /*
-                 * All the columns of the column family are being projected. So we will need to
-                 * consider all the columns in the column family to determine the min-max range.
-                 */
-                String family = Bytes.toString(entry.getKey());
-                if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) {
-                    //TODO: samarth confirm with James why do we need this hack here :(
-                    family = IndexUtil.getLocalIndexColumnFamily(family);
-                }
-                Pair<Integer, Integer> range = qualifierRanges.get(family);
-                if (range != null) {
-                    determineQualifierRange(range.getFirst(), minMaxQualifiers);
-                    determineQualifierRange(range.getSecond(), minMaxQualifiers);
-                }
+            }
+            if (minMaxQualifiers.getFirst() != null) {
+                scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER,
+                    Bytes.toBytes(minMaxQualifiers.getFirst()));
+                scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER,
+                    Bytes.toBytes(minMaxQualifiers.getSecond()));
+                ScanUtil.setQualifierRangesOnFilter(scan, minMaxQualifiers);
             }
         }
-        if (minMaxQualifiers.getFirst() == null) {
-            return null;
-        }
-        return minMaxQualifiers;
     }
 
-    /**
-     * 
-     * @param cq
-     * @param minMaxQualifiers
-     * @return true if the empty column was projected
-     */
-    private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
+    private static void adjustQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
         if (minMaxQualifiers.getFirst() == null) {
             minMaxQualifiers.setFirst(qualifier);
             minMaxQualifiers.setSecond(qualifier);
@@ -365,19 +368,27 @@
                 }
             }
         }
-        boolean preventSeekToColumn;
+        boolean preventSeekToColumn = false;
         if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
             // Allow seeking to column during filtering
             preventSeekToColumn = false;
-        } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
-            // Prevent seeking to column during filtering
-            preventSeekToColumn = true;
-        } else {
-            int hbaseServerVersion = context.getConnection().getQueryServices().getLowestClusterHBaseVersion();
-            // When only a single column family is referenced, there are no hints, and HBase server version
-            // is less than when the fix for HBASE-13109 went in (0.98.12), then we prevent seeking to a
-            // column.
-            preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
+        } else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table)) {
+            /*
+             * preventSeekToColumn cannot be true, even if hinted, when encoded qualifier list
+             * optimization is being used. When using the optimization, it is necessary that we
+             * explicitly set the column qualifiers of the column family in the scan and not just
+             * project the entire column family.
+             */
+            if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
+                // Prevent seeking to column during filtering
+                preventSeekToColumn = true;
+            } else {
+                int hbaseServerVersion = context.getConnection().getQueryServices().getLowestClusterHBaseVersion();
+                // When only a single column family is referenced, there are no hints, and HBase server version
+                // is less than when the fix for HBASE-13109 went in (0.98.12), then we prevent seeking to a
+                // column.
+                preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
+            }
         }
         for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
             ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
@@ -440,7 +451,7 @@
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, 
-                        trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+                    trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
                         columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme())));
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 453e33b..07588d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -17,13 +17,10 @@
  */
 package org.apache.phoenix.schema;
 
-import static org.apache.phoenix.util.EncodedColumnsUtil.usesEncodedColumnNames;
-
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index 5a5b355..10329fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -31,26 +31,27 @@
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 
 /**
- * List implementation that provides indexed based look up when the cell column qualifiers are positive numbers. 
- * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link ImmutableStorageScheme}. 
- * The api methods in this list assume that the caller wants to see
- * and add only non null elements in the list. 
+ * List implementation that provides indexed based look up when the cell column qualifiers are
+ * positive numbers. These qualifiers are generated by using one of the column qualifier encoding
+ * schemes specified in {@link QualifierEncodingScheme}. The api methods in this list assume that
+ * the caller wants to see and add only non null elements in the list.
  * <p>
- * Please note that this implementation doesn't implement all the optional methods of the 
- * {@link List} interface. Such unsupported methods could violate the basic invariance of the list that every cell with
- * an encoded column qualifier has a fixed position in the list.
+ * Please note that this implementation doesn't implement all the optional methods of the
+ * {@link List} interface. Such unsupported methods could violate the basic invariance of the list
+ * that every cell with an encoded column qualifier has a fixed position in the list.
  * </p>
  * <p>
- * An important performance characteristic of this list is that doing look up on the basis of index via {@link #get(int)}
- * is an O(n) operation. This makes iterating through the list using {@link #get(int)} an O(n^2) operation.
- * Instead, for iterating through the list, one should use the iterators created through {@link #iterator()} or 
- * {@link #listIterator()}. Do note that getting an element using {@link #getCellForColumnQualifier(int)} is an O(1) operation
- * and should generally be the way for accessing elements in the list.
- * </p> 
+ * An important performance characteristic of this list is that doing look up on the basis of index
+ * via {@link #get(int)} is an O(n) operation. This makes iterating through the list using
+ * {@link #get(int)} an O(n^2) operation. Instead, for iterating through the list, one should use
+ * the iterators created through {@link #iterator()} or {@link #listIterator()}. Do note that
+ * getting an element using {@link #getCellForColumnQualifier(byte[])} or
+ * {@link #getCellForColumnQualifier(byte[], int, int)} is an O(1) operation and should generally be
+ * the way for accessing elements in the list.
+ * </p>
  */
 @NotThreadSafe
 public class EncodedColumnQualiferCellsList implements List<Cell> {
@@ -61,14 +62,15 @@
     private final Cell[] array;
     private int numNonNullElements;
     private int firstNonNullElementIdx = -1;
-    private static final int RESERVED_RANGE_SIZE = ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
+    private static final int RESERVED_RANGE_SIZE =
+            ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
     // Used by iterators to figure out if the list was structurally modified.
     private int modCount = 0;
     private final QualifierEncodingScheme encodingScheme;
 
-    public EncodedColumnQualiferCellsList(int minQ, int maxQ, QualifierEncodingScheme encodingScheme) {
-        checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ
-                + ". Max: " + maxQ);
+    public EncodedColumnQualiferCellsList(int minQ, int maxQ,
+            QualifierEncodingScheme encodingScheme) {
+        checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ + ". Max: " + maxQ);
         this.minQualifier = minQ;
         this.maxQualifier = maxQ;
         int size = 0;
@@ -80,7 +82,9 @@
             size = RESERVED_RANGE_SIZE + (maxQ - minQ + 1);
         }
         this.array = new Cell[size];
-        this.nonReservedRangeOffset = minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE ? minQ  - ENCODED_CQ_COUNTER_INITIAL_VALUE : 0;
+        this.nonReservedRangeOffset =
+                minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE ? minQ - ENCODED_CQ_COUNTER_INITIAL_VALUE
+                        : 0;
         this.encodingScheme = encodingScheme;
     }
 
@@ -133,8 +137,10 @@
         if (e == null) {
             throw new NullPointerException();
         }
-        int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());
-                
+        int columnQualifier =
+                encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(),
+                    e.getQualifierLength());
+
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);
         if (array[idx] == null) {
@@ -233,7 +239,8 @@
             ListIterator<Cell> listItr = this.listIterator();
             while (listItr.hasNext()) {
                 Cell cellInThis = listItr.next();
-                int qualifier = encodingScheme.decode(cellInThis.getQualifierArray(),
+                int qualifier =
+                        encodingScheme.decode(cellInThis.getQualifierArray(),
                             cellInThis.getQualifierOffset(), cellInThis.getQualifierLength());
                 try {
                     Cell cellInParam = list.getCellForColumnQualifier(qualifier);
@@ -278,8 +285,8 @@
                 }
             }
         }
-        throw new IllegalStateException("There was no element present in the list at index "
-                + index + " even though number of elements in the list are " + size());
+        throw new IllegalStateException("There was no element present in the list at index " + index
+                + " even though number of elements in the list are " + size());
     }
 
     @Override
@@ -356,16 +363,26 @@
         return new Itr();
     }
 
+    /**
+     * @param qualifierBytes bytes of the column qualifier which serves as the index
+     * @return {@link Cell} at the index
+     */
     public Cell getCellForColumnQualifier(byte[] qualifierBytes) {
         int columnQualifier = encodingScheme.decode(qualifierBytes);
         return getCellForColumnQualifier(columnQualifier);
     }
-    
+
+    /**
+     * @param qualifierBytes bytes of the column qualifier which serves as the index
+     * @param offset offset in the byte array
+     * @param length length starting from offset
+     * @return {@link Cell} at the index
+     */
     public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, int length) {
         int columnQualifier = encodingScheme.decode(qualifierBytes, offset, length);
         return getCellForColumnQualifier(columnQualifier);
     }
-    
+
     private Cell getCellForColumnQualifier(int columnQualifier) {
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);
@@ -382,7 +399,7 @@
 
     private void checkQualifierRange(int qualifier) {
         if (qualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
-            return; // space in the array for reserved range is always allocated. 
+            return; // space in the array for reserved range is always allocated.
         }
         if (qualifier < minQualifier || qualifier > maxQualifier) {
             throw new IndexOutOfBoundsException("Qualifier " + qualifier
@@ -413,7 +430,7 @@
         protected int nextIndex = 0;
         protected int lastRet = -1;
         protected int expectedModCount = modCount;
-        
+
         private Itr() {
             moveForward(true);
         }
@@ -461,7 +478,7 @@
                 nextIndex = -1;
             }
         }
-        
+
         protected void checkForCoModification() {
             if (modCount != expectedModCount) {
                 throw new ConcurrentModificationException();
@@ -472,7 +489,7 @@
 
     private class ListItr extends Itr implements ListIterator<Cell> {
         private int previousIndex = -1;
-        
+
         private ListItr() {
             moveForward(true);
         }
@@ -522,7 +539,9 @@
             if (lastRet == -1) {
                 throw new IllegalStateException();
             }
-            int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());                    
+            int columnQualifier =
+                    encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(),
+                        e.getQualifierLength());
             int idx = getArrayIndex(columnQualifier);
             if (idx != lastRet) {
                 throw new IllegalArgumentException("Cell " + e + " with column qualifier "
@@ -538,13 +557,13 @@
         public void add(Cell e) {
             throwGenericUnsupportedOperationException();
         }
-        
+
         @Override
         protected void moveForward(boolean init) {
             if (!init) {
                 previousIndex = nextIndex;
             }
-            int i = init ? 0 : nextIndex + 1; 
+            int i = init ? 0 : nextIndex + 1;
             moveNextPointer(i);
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index fb6baf0..591fc0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -17,13 +17,12 @@
  */
 package org.apache.phoenix.util;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Collection;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -34,15 +33,13 @@
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.tuple.Tuple;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
 public class EncodedColumnsUtil {
 
     public static boolean usesEncodedColumnNames(PTable table) {
@@ -128,7 +125,7 @@
         return new Pair<>(minQ, maxQ);
     }
 
-    public static boolean setQualifierRanges(PTable table) {
+    public static boolean useEncodedQualifierListOptimization(PTable table) {
         return table.getImmutableStorageScheme() != null
                 && table.getImmutableStorageScheme() == ImmutableStorageScheme.ONE_CELL_PER_COLUMN
                 && usesEncodedColumnNames(table) && !table.isTransactional()
@@ -139,24 +136,22 @@
         return minMaxQualifiers != null;
     }
 
-    public static Map<String, Pair<Integer, Integer>> getFamilyQualifierRanges(PTable table) {
-        checkNotNull(table);
+    public static Pair<Integer, Integer> setQualifiersForColumnsInFamily(PTable table, String cf, NavigableSet<byte[]> qualifierSet)
+            throws ColumnFamilyNotFoundException {
         QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
-        Preconditions.checkArgument(encodingScheme != NON_ENCODED_QUALIFIERS);
-        if (table.getEncodedCQCounter() != null) {
-            Map<String, Integer> values = table.getEncodedCQCounter().values();
-            Map<String, Pair<Integer, Integer>> toReturn = Maps.newHashMapWithExpectedSize(values.size());
-            for (Entry<String, Integer> e : values.entrySet()) {
-                Integer lowerBound = QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
-                Integer upperBound = e.getValue() - 1;
-                if (lowerBound > upperBound) {
-                    lowerBound = upperBound;
-                }
-                toReturn.put(e.getKey(), new Pair<>(lowerBound, upperBound));
+        checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS);
+        Collection<PColumn> columns = table.getColumnFamily(cf).getColumns();
+        if (columns.size() > 0) {
+            int[] qualifiers = new int[columns.size()];
+            int i = 0;
+            for (PColumn col : columns) {
+                qualifierSet.add(col.getColumnQualifierBytes());
+                qualifiers[i++] = encodingScheme.decode(col.getColumnQualifierBytes());
             }
-            return toReturn;
+            Arrays.sort(qualifiers);
+            return new Pair<>(qualifiers[0], qualifiers[qualifiers.length - 1]);
         }
-        return Collections.emptyMap();
+        return null;
     }
     
     public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, PTable table, boolean isPk) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 6c8ac48..e1dacb7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -24,6 +24,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Array;
@@ -781,7 +782,7 @@
         conn.createStatement().execute("CREATE TABLE " + tableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 VARCHAR) COLUMN_ENCODED_BYTES=4");
         PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
         ResultSet rs = stmt.executeQuery("SELECT K from " + tableName);
-        assertQualifierRanges(rs, ENCODED_CQ_COUNTER_INITIAL_VALUE, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
+        assertQualifierRangesNotPresent(rs);
         rs = stmt.executeQuery("SELECT V2 from " + tableName);
         assertQualifierRanges(rs, ENCODED_EMPTY_COLUMN_NAME, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1);
         rs = stmt.executeQuery("SELECT V1 from " + tableName);
@@ -796,9 +797,9 @@
         assertEquals(maxQualifier, Bytes.toInt(scan.getAttribute(MAX_QUALIFIER)));
     }
     
-//    private static void assertQualifierRangesNotPresent(ResultSet rs) throws SQLException {
-//        Scan scan = rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getContext().getScan();
-//        assertNull(scan.getAttribute(MIN_QUALIFIER));
-//        assertNull(scan.getAttribute(MAX_QUALIFIER));
-//    }
+    private static void assertQualifierRangesNotPresent(ResultSet rs) throws SQLException {
+        Scan scan = rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getContext().getScan();
+        assertNull(scan.getAttribute(MIN_QUALIFIER));
+        assertNull(scan.getAttribute(MAX_QUALIFIER));
+    }
 }
\ No newline at end of file