PHOENIX-6120 Change IndexMaintainer for SINGLE_CELL_ARRAY_WITH_OFFSETS indexes

Signed-off-by: Gokcen Iskender <giskender@salesforce.com>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 980c0fa..bdefcfd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -42,16 +42,22 @@
 public class IndexScrutinyToolBaseIT extends BaseTest {
     protected String outputDir;
 
+    protected static String indexRegionObserverEnabled = Boolean.FALSE.toString();
+    private static String previousIndexRegionObserverEnabled = indexRegionObserverEnabled;
+
     @BeforeClass public static synchronized void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMap();
         //disable major compactions
         serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
         Map<String, String> clientProps = Maps.newHashMap();
 
-        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, indexRegionObserverEnabled);
 
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
-                new ReadOnlyProps(clientProps.entrySet().iterator()));
+        if (!previousIndexRegionObserverEnabled.equals(indexRegionObserverEnabled)) {
+           driver = null;
+        }
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        previousIndexRegionObserverEnabled = indexRegionObserverEnabled;
     }
 
     protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index a56ebb3..8237fd1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -14,6 +14,7 @@
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,6 +70,8 @@
 import org.junit.runners.Parameterized;
 
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests for the {@link IndexScrutinyTool}
@@ -76,6 +79,7 @@
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexScrutinyToolIT.class);
     public static final String MISSING_ROWS_QUERY_TEMPLATE =
         "SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , " +
         "\"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , " +
@@ -125,15 +129,25 @@
     private Properties props;
 
     @Parameterized.Parameters public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
+        return Arrays.asList(new Object[][] {
+                { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) ",
+                        "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2" },
+                { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
                 "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
                 "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
                 "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
     }
 
-    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
+    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) throws Exception {
         this.dataTableDdl = dataTableDdl;
         this.indexTableDdl = indexTableDdl;
+        if (isOnlyIndexSingleCell()) {
+            indexRegionObserverEnabled = Boolean.TRUE.toString();
+            doSetup();
+        } else {
+            indexRegionObserverEnabled = Boolean.FALSE.toString();
+            doSetup();
+        }
     }
 
     /**
@@ -160,6 +174,13 @@
         }
     }
 
+    private boolean isOnlyIndexSingleCell() {
+        if (indexTableDdl.contains(SINGLE_CELL_ARRAY_WITH_OFFSETS.toString()) && !dataTableDdl.contains(SINGLE_CELL_ARRAY_WITH_OFFSETS.toString())) {
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
      */
@@ -368,6 +389,9 @@
      * number of incorrect rows when run with the index as the source table
      */
     @Test public void testMoreIndexRows() throws Exception {
+        if (isOnlyIndexSingleCell()) {
+            return;
+        }
         upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
         conn.commit();
         disableIndex();
@@ -420,6 +444,9 @@
      * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
      */
     @Test public void testOutputInvalidRowsToFile() throws Exception {
+        if (isOnlyIndexSingleCell()) {
+            return;
+        }
         insertOneValid_OneBadVal_OneMissingTarget();
 
         String[]
@@ -469,6 +496,9 @@
      * Tests writing of results to the output table
      */
     @Test public void testOutputInvalidRowsToTable() throws Exception {
+        if (isOnlyIndexSingleCell()) {
+            return;
+        }
         insertOneValid_OneBadVal_OneMissingTarget();
         String[]
                 argValues =
@@ -646,9 +676,9 @@
 
     private void generateUniqueTableNames() {
         schemaName = generateUniqueName();
-        dataTableName = generateUniqueName();
+        dataTableName = "TBL" + generateUniqueName();
         dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        indexTableName = generateUniqueName();
+        indexTableName = "IDX_" + generateUniqueName();
         indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 5653029..aa21352 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -121,27 +121,37 @@
 
     public static final int MAX_LOOKBACK_AGE = 3600;
     private final String tableDDLOptions;
+    private final String indexDDLOptions;
     private boolean directApi = true;
     private boolean useSnapshot = false;
     private boolean mutable;
+    private boolean singleCell;
     @Rule
     public ExpectedException exceptionRule = ExpectedException.none();
 
-    public IndexToolForNonTxGlobalIndexIT(boolean mutable) {
+    public IndexToolForNonTxGlobalIndexIT(boolean mutable, boolean singleCell) {
         StringBuilder optionBuilder = new StringBuilder();
+        StringBuilder indexOptionBuilder = new StringBuilder();
         this.mutable = mutable;
         if (!mutable) {
             optionBuilder.append(" IMMUTABLE_ROWS=true ");
         }
         optionBuilder.append(" SPLIT ON(1,2)");
         this.tableDDLOptions = optionBuilder.toString();
+        if (singleCell) {
+            indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2");
+        }
+        this.singleCell = singleCell;
+        this.indexDDLOptions = indexOptionBuilder.toString();
     }
 
-    @Parameterized.Parameters(name = "mutable={0}")
+    @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}")
     public static synchronized Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                {true},
-                {false} });
+                {true, true},
+                {true, false},
+                {false, true},
+                {false, false} });
     }
 
     @BeforeClass
@@ -207,7 +217,7 @@
             IndexToolIT.setEveryNthRowWithNull(NROWS, 3, stmt);
             conn.commit();
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", indexTableName, dataTableFullName));
+                    "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC " + this.indexDDLOptions, indexTableName, dataTableFullName));
             // Run the index MR job and verify that the index table is built correctly
             IndexTool
                     indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
@@ -257,7 +267,7 @@
                     .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
             conn.commit();
             conn.createStatement()
-                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
+                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC " + this.indexDDLOptions,
                             indexTableName, dataTableFullName));
             IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
                     IndexTool.IndexVerifyType.ONLY);
@@ -335,7 +345,7 @@
 
             String stmtString2 =
                     String.format(
-                            "CREATE INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ", indexTableName, dataTableFullName);
+                            "CREATE INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') " + this.indexDDLOptions, indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
             conn.commit();
             String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
@@ -385,7 +395,7 @@
             conn.commit();
             String stmtString2 =
                     String.format(
-                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
+                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC " + this.indexDDLOptions, indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
 
             // Run the index MR job and verify that the index table is built correctly
@@ -461,7 +471,7 @@
             conn.commit();
             String stmtString2 =
                     String.format(
-                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
+                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC " + this.indexDDLOptions, indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
 
             // Run the index MR job and verify that the index table is built correctly
@@ -521,7 +531,7 @@
             conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
             conn.commit();
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC " + this.indexDDLOptions, indexTableName, viewFullName));
             TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, IndexToolIT.MutationCountingRegionObserver.class);
             // Run the index MR job and verify that the index table rebuild succeeds
             IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
@@ -563,7 +573,7 @@
             // lead to any change on index and thus the index verify during index rebuild should fail
             IndexRebuildRegionScanner.setIgnoreIndexRebuildForTesting(true);
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC " + this.indexDDLOptions, indexTableName, viewFullName));
             // Run the index MR job and verify that the index table rebuild fails
             IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
                     null, -1, IndexTool.IndexVerifyType.AFTER);
@@ -595,7 +605,7 @@
             conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
             conn.commit();
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC " + this.indexDDLOptions, indexTableName, dataTableFullName));
             // Run the index MR job to only verify that each data table row has a corresponding index row
             // IndexTool will go through each data table row and record the mismatches in the output table
             // called PHOENIX_INDEX_TOOL
@@ -637,7 +647,7 @@
             conn.createStatement().execute("CREATE TABLE " + dataTableFullName
                     + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions);
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName));
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) " + this.indexDDLOptions, indexTableName, dataTableFullName));
 
             conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
             conn.createStatement().execute("upsert into " + dataTableFullName + " values (2, 'Phoenix1', 'B')");
@@ -711,10 +721,10 @@
             conn.createStatement().execute("CREATE VIEW "+viewFullName+" AS SELECT * FROM "+dataTableFullName);
 
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX "+viewIndexName+" ON "+viewFullName+" (val3) INCLUDE(val5)"));
+                    "CREATE INDEX "+viewIndexName+" ON "+viewFullName+" (val3) INCLUDE(val5) " + this.indexDDLOptions));
 
             conn.createStatement().execute(String.format(
-                    "CREATE INDEX "+indexTableName+" ON "+dataTableFullName+" (val3) INCLUDE(val5)"));
+                    "CREATE INDEX "+indexTableName+" ON "+dataTableFullName+" (val3) INCLUDE(val5) " + this.indexDDLOptions));
 
             customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
             EnvironmentEdgeManager.injectEdge(customEdge);
@@ -840,7 +850,7 @@
                     "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName + " WHERE val6 = 'def'");
             conn.createStatement().execute(String.format(
                     "CREATE INDEX " + viewIndexName + " ON " + viewFullName
-                            + " (val3) INCLUDE(val5)"));
+                            + " (val3) INCLUDE(val5) " + this.indexDDLOptions));
             customeEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
             EnvironmentEdgeManager.injectEdge(customeEdge);
 
@@ -959,7 +969,7 @@
             //create ASYNC
             String stmtString2 =
                 String.format(
-                    "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
+                    "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC " + this.indexDDLOptions,
                     indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
             conn.commit();
@@ -1057,7 +1067,7 @@
 
             String stmtString2 =
                 String.format(
-                    "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
+                    "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC " + this.indexDDLOptions,
                     indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
             conn.commit();
@@ -1110,7 +1120,7 @@
 
             String stmtString2 =
                     String.format(
-                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
+                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC " + this.indexDDLOptions, indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
 
             // Run the index MR job and verify that the index table is built correctly
@@ -1143,7 +1153,7 @@
             conn.createStatement().execute("DELETE FROM " + fullDataTableName + " WHERE k = 'b'");
             conn.createStatement().execute("DELETE FROM " + fullDataTableName + " WHERE k = 'c'");
             conn.commit();
-            conn.createStatement().execute(String.format("CREATE INDEX %s ON %s (v) ASYNC",
+            conn.createStatement().execute(String.format("CREATE INDEX %s ON %s (v) ASYNC " + this.indexDDLOptions,
                 indexTableName, fullDataTableName));
             // Run the index MR job and verify that the index table is built correctly
             Configuration conf = new Configuration(getUtility().getConfiguration());
@@ -1196,7 +1206,7 @@
             conn.createStatement().execute("UPSERT INTO " + fullDataTableName + " VALUES('b','bbb')");
             conn.createStatement().execute("DELETE FROM " + fullDataTableName + " WHERE k = 'c'");
             conn.commit();
-            conn.createStatement().execute(String.format("CREATE INDEX %s ON %s (v) ASYNC",
+            conn.createStatement().execute(String.format("CREATE INDEX %s ON %s (v) ASYNC " + this.indexDDLOptions,
                 indexTableName, fullDataTableName));
             // Run the index MR job and verify that the index table is built correctly
             Configuration conf = new Configuration(getUtility().getConfiguration());
@@ -1291,7 +1301,7 @@
 
             String createViewIndex =
                     "CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
-                            + " (VIEW_COLB) ASYNC";
+                            + " (VIEW_COLB) ASYNC " + this.indexDDLOptions;
             conn.createStatement().execute(createViewIndex);
             conn.commit();
             // Rebuild using index tool
@@ -1369,7 +1379,7 @@
 
             String createViewIndex =
                     "CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
-                            + " (VIEW_COLB) ASYNC";
+                            + " (VIEW_COLB) ASYNC " + this.indexDDLOptions;
             conn.createStatement().execute(createViewIndex);
             conn.commit();
             // Rebuild using index tool
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c56e01f..a4879d1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -119,6 +119,7 @@
     private final boolean transactional;
     private final boolean directApi;
     private final String tableDDLOptions;
+    private final String indexDDLOptions;
     private final boolean useSnapshot;
     private final boolean useTenantId;
 
@@ -142,6 +143,11 @@
         }
         optionBuilder.append(" SPLIT ON(1,2)");
         this.tableDDLOptions = optionBuilder.toString();
+        StringBuilder indexOptionBuilder = new StringBuilder();
+        if (!localIndex && transactionProvider == null) {
+            indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2");
+        }
+        this.indexDDLOptions = indexOptionBuilder.toString();
     }
 
     @BeforeClass
@@ -256,7 +262,7 @@
 
             String stmtString2 =
                     String.format(
-                        "CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
+                        "CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC " + this.indexDDLOptions,
                         (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
 
@@ -436,7 +442,7 @@
         String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
 
         String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')";
-        String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+        String createIndexStr = "CREATE INDEX %s ON %s (NAME) " + this.indexDDLOptions;
 
         try {
             String tableStmtGlobal = String.format(createTblStr, dataTableName);
@@ -526,7 +532,7 @@
 
             String indexDDL =
                     String.format(
-                        "CREATE %s INDEX %s on %s (\"info\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
+                        "CREATE %s INDEX %s on %s (\"info\".CAR_NUM,\"info\".CAP_DATE) ASYNC " + this.indexDDLOptions,
                         (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
             conn.createStatement().execute(indexDDL);
 
@@ -603,7 +609,7 @@
 
             String indexDDL =
                     String.format(
-                        "CREATE INDEX %s on %s (\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
+                        "CREATE INDEX %s on %s (\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC " + this.indexDDLOptions,
                         indexTableName, dataTableFullName);
             conn.createStatement().execute(indexDDL);
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
new file mode 100644
index 0000000..1d6763a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -0,0 +1,366 @@
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class SingleCellIndexIT extends ParallelStatsDisabledIT {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SingleCellIndexIT.class);
+    private Properties testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    private boolean mutable;
+    private final String tableDDLOptions;
+
+    @Parameterized.Parameters(name = "mutable={0}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false} });
+    }
+
+    @Before
+    public void setupBefore() {
+        testProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        testProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, "0");
+    }
+
+    public SingleCellIndexIT(boolean mutable) {
+        StringBuilder optionBuilder = new StringBuilder();
+        this.mutable = mutable;
+        optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN ");
+        if (!mutable) {
+            optionBuilder.append(", IMMUTABLE_ROWS=true ");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @Test
+    public void testCreateOneCellTableAndSingleCellIndex() throws Exception {
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+
+            createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, 3);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+            assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
+
+            dumpTable(tableName);
+
+            String selectFromData = "SELECT /*+ NO_INDEX */ PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 3 and V4 LIKE 'V4%'";
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromData);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(tableName));
+
+            rs = conn.createStatement().executeQuery(selectFromData);
+            assertTrue(rs.next());
+            assertEquals("PK2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals("V12", rs.getString(3));
+            assertEquals(3, rs.getInt(4));
+            assertEquals("V42", rs.getString(5));
+            assertTrue(rs.next());
+
+            String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4 FROM " + tableName + " where V2 >= 3 and V4 LIKE 'V4%'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(idxName));
+
+            rs = conn.createStatement().executeQuery(selectFromIndex);
+            assertTrue(rs.next());
+            assertEquals("PK2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals("V12", rs.getString(3));
+            assertEquals(3, rs.getInt(4));
+            assertEquals("V42", rs.getString(5));
+            assertTrue(rs.next());
+            assertEquals("PK3", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals("V13", rs.getString(3));
+            assertEquals(4, rs.getInt(4));
+            assertEquals("V43", rs.getString(5));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testAddColumns() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+
+            createTableAndIndex(conn, tableName, idxName, null, 3);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+            assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
+
+            String alterTable = "ALTER TABLE " + tableName + " ADD V_NEW VARCHAR CASCADE INDEX ALL";
+            conn.createStatement().execute(alterTable);
+
+            String upsert = "UPSERT INTO " + tableName + " (PK1,INT_PK,V1, V2, V3, V4, V5, V_NEW) VALUES ('PK99',99,'V199',100,101,'V499','V699','V_NEW99')";
+            conn.createStatement().executeUpdate(upsert);
+            conn.commit();
+
+            String selectFromIndex = "SELECT PK1, INT_PK, V1, V2, V4, V_NEW FROM " + tableName + " where V1='V199' AND V2=100";
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(idxName));
+
+            rs = conn.createStatement().executeQuery(selectFromIndex);
+            assertTrue(rs.next());
+            assertEquals("PK99", rs.getString(1));
+            assertEquals(99, rs.getInt(2));
+            assertEquals("V199", rs.getString(3));
+            assertEquals(100, rs.getInt(4));
+            assertEquals("V499", rs.getString(5));
+            assertEquals("V_NEW99", rs.getString(6));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testDropColumns() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+
+            String tableName = "TBL_" + generateUniqueName();
+            String idxName = "IND_" + generateUniqueName();
+
+            createTableAndIndex(conn, tableName, idxName, null, 1);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+            assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName);
+
+            String alterTable = "ALTER TABLE " + tableName + " DROP COLUMN V2";
+            conn.createStatement().execute(alterTable);
+
+            String star = "SELECT * FROM " + idxName ;
+            ResultSet rs = conn.createStatement().executeQuery(star);
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals("V11", rs.getString(3));
+            assertEquals("V41", rs.getString(4));
+            assertFalse(rs.next());
+
+            String selectFromIndex = "SELECT PK1, INT_PK, V1, V4 FROM " + tableName + " where V1='V11'";
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(actualExplainPlan.contains(idxName));
+
+            rs = conn.createStatement().executeQuery(selectFromIndex);
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            assertEquals("V11", rs.getString(3));
+            assertEquals("V41", rs.getString(4));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testTenantViewIndexes() throws Exception {
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+            conn.setAutoCommit(true);
+
+            String tableName = "TBL_" + generateUniqueName();
+            String view1IndexName = "IND_" + generateUniqueName();
+            String divergedViewIndex = "DIND_" + generateUniqueName();
+            String view3IndexName = "IND_" + generateUniqueName();
+            String view1 = "V_" + generateUniqueName();
+            String divergedView =  "V_" + generateUniqueName();
+            String view3 = "V_" + generateUniqueName();
+
+            String baseTableDDL = "CREATE  TABLE " + tableName + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR, V3 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true"
+                    + (this.mutable ? "": ", IMMUTABLE_ROWS=true");
+            conn.createStatement().execute(baseTableDDL);
+            assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName);
+
+            try (Connection tenant1Conn = getTenantConnection("tenant1")) {
+                String
+                        view1DDL =
+                        "CREATE VIEW " + view1 + " ( VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+                                + tableName;
+                tenant1Conn.createStatement().execute(view1DDL);
+                String indexDDL = "CREATE INDEX " + view1IndexName + " ON " + view1 + " (V1) include (VIEW_COL2, V3) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2";
+                tenant1Conn.createStatement().execute(indexDDL);
+                tenant1Conn.commit();
+            }
+
+            try (Connection tenant2Conn = getTenantConnection("tenant2")) {
+                String
+                        divergedViewDDL =
+                        "CREATE VIEW " + divergedView + " ( VIEW_COL1 VARCHAR, VIEW_COL2 VARCHAR) AS SELECT * FROM "
+                                + tableName ;
+                tenant2Conn.createStatement().execute(divergedViewDDL);
+                // Drop column V2 from the view to have it diverge from the base table
+                tenant2Conn.createStatement().execute("ALTER VIEW " + divergedView + " DROP COLUMN V2");
+
+                // create an index on the diverged view
+                String indexDDL = "CREATE INDEX " + divergedViewIndex + " ON " + divergedView + " (V1) include (VIEW_COL1, V3) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS";
+                tenant2Conn.createStatement().execute(indexDDL);
+                tenant2Conn.commit();
+            }
+
+            try (Connection tenant3Conn = getTenantConnection("tenant3")) {
+                String
+                        view3DDL =
+                        "CREATE VIEW " + view3 + " ( VIEW_COL31 VARCHAR, VIEW_COL32 VARCHAR) AS SELECT * FROM "
+                                + tableName;
+                tenant3Conn.createStatement().execute(view3DDL);
+                String indexDDL = "CREATE INDEX " + view3IndexName + " ON " + view3 + " (V1) include (VIEW_COL32, V3)";
+                tenant3Conn.createStatement().execute(indexDDL);
+                tenant3Conn.commit();
+            }
+
+            String upsert = "UPSERT INTO " + view1 + " (PK1, V1, V2, V3, VIEW_COL1, VIEW_COL2) VALUES ('PK1',  'V1', 'V2', 'V3','VIEW_COL1_1','VIEW_COL2_1')";
+            try (Connection viewConn = getTenantConnection("tenant1")) {
+                viewConn.createStatement().executeUpdate(upsert);
+                viewConn.commit();
+                Statement stmt = viewConn.createStatement();
+                String sql = "SELECT V3, VIEW_COL2 FROM " + view1 + " WHERE V1 = 'V1'";
+                QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
+                assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(view1IndexName)));
+                ResultSet rs = viewConn.createStatement().executeQuery(sql);
+                assertTrue(rs.next());
+                assertEquals("V3", rs.getString(1));
+                assertEquals("VIEW_COL2_1", rs.getString(2));
+            }
+
+            // Upsert records in diverged view. Verify that the PK column was added to the index on it.
+            upsert = "UPSERT INTO " + divergedView + " (PK1, V1, V3, VIEW_COL1, VIEW_COL2) VALUES ('PK1', 'V1', 'V3','VIEW_COL21_1','VIEW_COL22_1')";
+            try (Connection viewConn = getTenantConnection("tenant2")) {
+                viewConn.createStatement().executeUpdate(upsert);
+                viewConn.commit();
+                Statement stmt = viewConn.createStatement();
+                String sql = "SELECT V3, VIEW_COL1 FROM " + divergedView + " WHERE V1 = 'V1'";
+                QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
+                assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(divergedViewIndex)));
+                ResultSet rs = viewConn.createStatement().executeQuery(sql);
+                assertTrue(rs.next());
+                assertEquals("V3", rs.getString(1));
+                assertEquals("VIEW_COL21_1", rs.getString(2));
+            }
+
+            upsert = "UPSERT INTO " + view3 + " (PK1, V1, V2, V3, VIEW_COL31, VIEW_COL32) VALUES ('PK1',  'V1', 'V2', 'V3','VIEW_COL31_1','VIEW_COL32_1')";
+            try (Connection viewConn = getTenantConnection("tenant3")) {
+                viewConn.createStatement().executeUpdate(upsert);
+                viewConn.commit();
+                Statement stmt = viewConn.createStatement();
+                String sql = "SELECT V3, VIEW_COL32 FROM " + view3 + " WHERE V1 = 'V1'";
+                QueryPlan plan = stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql);
+                assertTrue(plan.getTableRef().getTable().getName().getString().equals(SchemaUtil.normalizeIdentifier(view3IndexName)));
+                ResultSet rs = viewConn.createStatement().executeQuery(sql);
+                assertTrue(rs.next());
+                assertEquals("V3", rs.getString(1));
+                assertEquals("VIEW_COL32_1", rs.getString(2));
+            }
+            dumpTable("_IDX_" + tableName);
+        }
+    }
+
+    private Connection getTenantConnection(String tenantId) throws Exception {
+        Properties tenantProps = PropertiesUtil.deepCopy(testProps);
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), tenantProps);
+    }
+
+    private void assertMetadata(Connection conn, PTable.ImmutableStorageScheme expectedStorageScheme, PTable.QualifierEncodingScheme
+            expectedColumnEncoding, String tableName)
+            throws Exception {
+        PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+        PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName));
+        assertEquals(expectedStorageScheme, table.getImmutableStorageScheme());
+        assertEquals(expectedColumnEncoding, table.getEncodingScheme());
+    }
+
+    private void createTableAndIndex(Connection conn, String tableName, String indexName, String tableDDL, int numOfRows)
+            throws SQLException {
+        String createTableSql = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, V1 VARCHAR, V2 INTEGER, V3 INTEGER, V4 VARCHAR, V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) "
+                + (tableDDL == null ? "" : tableDDL);
+        LOGGER.debug(createTableSql);
+        conn.createStatement().execute(createTableSql);
+
+        String createIndexSql = "CREATE INDEX " + indexName + " ON " + tableName + " (PK1, INT_PK) include (V1,V2,V4) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2";
+        conn.createStatement().execute(createIndexSql);
+
+        String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V1,  V2, V3, V4, V5) VALUES (?,?,?,?,?,?,?)";
+        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+
+        for (int i=1; i <= numOfRows; i++) {
+            upsertStmt.setString(1, "PK"+i);
+            upsertStmt.setInt(2, i);
+            upsertStmt.setString(3, "V1"+i);
+            upsertStmt.setInt(4, i+1);
+            upsertStmt.setInt(5, i+2);
+            upsertStmt.setString(6, "V4"+i);
+            upsertStmt.setString(7, "V5"+i);
+            upsertStmt.executeUpdate();
+        }
+    }
+
+    public static void dumpTable(String tableName) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Table
+                    hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
+            Scan scan = new Scan();
+            scan.setRaw(true);
+            LOGGER.debug("***** Table Name : " + tableName);
+            ResultScanner scanner = hTable.getScanner(scan);
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                for (Cell cell : result.rawCells()) {
+                    String cellString = cell.toString();
+                    for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap()
+                            .entrySet()) {
+                        byte[] family = entryF.getKey();
+                    }
+                    LOGGER.debug(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                }
+            }
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index ab1ffee..a9cc2f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -2192,6 +2192,26 @@
      */
     com.google.protobuf.ByteString
         getParentTableTypeBytes();
+
+    // optional int32 dataEncodingScheme = 25;
+    /**
+     * <code>optional int32 dataEncodingScheme = 25;</code>
+     */
+    boolean hasDataEncodingScheme();
+    /**
+     * <code>optional int32 dataEncodingScheme = 25;</code>
+     */
+    int getDataEncodingScheme();
+
+    // optional int32 dataImmutableStorageScheme = 26;
+    /**
+     * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+     */
+    boolean hasDataImmutableStorageScheme();
+    /**
+     * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+     */
+    int getDataImmutableStorageScheme();
   }
   /**
    * Protobuf type {@code IndexMaintainer}
@@ -2400,6 +2420,16 @@
               parentTableType_ = input.readBytes();
               break;
             }
+            case 200: {
+              bitField0_ |= 0x00080000;
+              dataEncodingScheme_ = input.readInt32();
+              break;
+            }
+            case 208: {
+              bitField0_ |= 0x00100000;
+              dataImmutableStorageScheme_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2959,6 +2989,38 @@
       }
     }
 
+    // optional int32 dataEncodingScheme = 25;
+    public static final int DATAENCODINGSCHEME_FIELD_NUMBER = 25;
+    private int dataEncodingScheme_;
+    /**
+     * <code>optional int32 dataEncodingScheme = 25;</code>
+     */
+    public boolean hasDataEncodingScheme() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    /**
+     * <code>optional int32 dataEncodingScheme = 25;</code>
+     */
+    public int getDataEncodingScheme() {
+      return dataEncodingScheme_;
+    }
+
+    // optional int32 dataImmutableStorageScheme = 26;
+    public static final int DATAIMMUTABLESTORAGESCHEME_FIELD_NUMBER = 26;
+    private int dataImmutableStorageScheme_;
+    /**
+     * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+     */
+    public boolean hasDataImmutableStorageScheme() {
+      return ((bitField0_ & 0x00100000) == 0x00100000);
+    }
+    /**
+     * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+     */
+    public int getDataImmutableStorageScheme() {
+      return dataImmutableStorageScheme_;
+    }
+
     private void initFields() {
       saltBuckets_ = 0;
       isMultiTenant_ = false;
@@ -2984,6 +3046,8 @@
       viewIndexIdType_ = 0;
       indexDataColumnCount_ = -1;
       parentTableType_ = "";
+      dataEncodingScheme_ = 0;
+      dataImmutableStorageScheme_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3153,6 +3217,12 @@
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeBytes(24, getParentTableTypeBytes());
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeInt32(25, dataEncodingScheme_);
+      }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        output.writeInt32(26, dataImmutableStorageScheme_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3263,6 +3333,14 @@
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(24, getParentTableTypeBytes());
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(25, dataEncodingScheme_);
+      }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(26, dataImmutableStorageScheme_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3391,6 +3469,16 @@
         result = result && getParentTableType()
             .equals(other.getParentTableType());
       }
+      result = result && (hasDataEncodingScheme() == other.hasDataEncodingScheme());
+      if (hasDataEncodingScheme()) {
+        result = result && (getDataEncodingScheme()
+            == other.getDataEncodingScheme());
+      }
+      result = result && (hasDataImmutableStorageScheme() == other.hasDataImmutableStorageScheme());
+      if (hasDataImmutableStorageScheme()) {
+        result = result && (getDataImmutableStorageScheme()
+            == other.getDataImmutableStorageScheme());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -3500,6 +3588,14 @@
         hash = (37 * hash) + PARENTTABLETYPE_FIELD_NUMBER;
         hash = (53 * hash) + getParentTableType().hashCode();
       }
+      if (hasDataEncodingScheme()) {
+        hash = (37 * hash) + DATAENCODINGSCHEME_FIELD_NUMBER;
+        hash = (53 * hash) + getDataEncodingScheme();
+      }
+      if (hasDataImmutableStorageScheme()) {
+        hash = (37 * hash) + DATAIMMUTABLESTORAGESCHEME_FIELD_NUMBER;
+        hash = (53 * hash) + getDataImmutableStorageScheme();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -3682,6 +3778,10 @@
         bitField0_ = (bitField0_ & ~0x00400000);
         parentTableType_ = "";
         bitField0_ = (bitField0_ & ~0x00800000);
+        dataEncodingScheme_ = 0;
+        bitField0_ = (bitField0_ & ~0x01000000);
+        dataImmutableStorageScheme_ = 0;
+        bitField0_ = (bitField0_ & ~0x02000000);
         return this;
       }
 
@@ -3831,6 +3931,14 @@
           to_bitField0_ |= 0x00040000;
         }
         result.parentTableType_ = parentTableType_;
+        if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.dataEncodingScheme_ = dataEncodingScheme_;
+        if (((from_bitField0_ & 0x02000000) == 0x02000000)) {
+          to_bitField0_ |= 0x00100000;
+        }
+        result.dataImmutableStorageScheme_ = dataImmutableStorageScheme_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4020,6 +4128,12 @@
           parentTableType_ = other.parentTableType_;
           onChanged();
         }
+        if (other.hasDataEncodingScheme()) {
+          setDataEncodingScheme(other.getDataEncodingScheme());
+        }
+        if (other.hasDataImmutableStorageScheme()) {
+          setDataImmutableStorageScheme(other.getDataImmutableStorageScheme());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5924,6 +6038,72 @@
         return this;
       }
 
+      // optional int32 dataEncodingScheme = 25;
+      private int dataEncodingScheme_ ;
+      /**
+       * <code>optional int32 dataEncodingScheme = 25;</code>
+       */
+      public boolean hasDataEncodingScheme() {
+        return ((bitField0_ & 0x01000000) == 0x01000000);
+      }
+      /**
+       * <code>optional int32 dataEncodingScheme = 25;</code>
+       */
+      public int getDataEncodingScheme() {
+        return dataEncodingScheme_;
+      }
+      /**
+       * <code>optional int32 dataEncodingScheme = 25;</code>
+       */
+      public Builder setDataEncodingScheme(int value) {
+        bitField0_ |= 0x01000000;
+        dataEncodingScheme_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 dataEncodingScheme = 25;</code>
+       */
+      public Builder clearDataEncodingScheme() {
+        bitField0_ = (bitField0_ & ~0x01000000);
+        dataEncodingScheme_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // optional int32 dataImmutableStorageScheme = 26;
+      private int dataImmutableStorageScheme_ ;
+      /**
+       * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+       */
+      public boolean hasDataImmutableStorageScheme() {
+        return ((bitField0_ & 0x02000000) == 0x02000000);
+      }
+      /**
+       * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+       */
+      public int getDataImmutableStorageScheme() {
+        return dataImmutableStorageScheme_;
+      }
+      /**
+       * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+       */
+      public Builder setDataImmutableStorageScheme(int value) {
+        bitField0_ |= 0x02000000;
+        dataImmutableStorageScheme_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 dataImmutableStorageScheme = 26;</code>
+       */
+      public Builder clearDataImmutableStorageScheme() {
+        bitField0_ = (bitField0_ & ~0x02000000);
+        dataImmutableStorageScheme_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:IndexMaintainer)
     }
 
@@ -9050,7 +9230,7 @@
       "ength\030\003 \002(\005\"4\n\017ColumnReference\022\016\n\006family" +
       "\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\"4\n\nColumnInfo\022" +
       "\022\n\nfamilyName\030\001 \001(\t\022\022\n\ncolumnName\030\002 \002(\t\"" +
-      "\232\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
+      "\332\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
       "\022\025\n\risMultiTenant\030\002 \002(\010\022\023\n\013viewIndexId\030\003" +
       " \001(\014\022(\n\016indexedColumns\030\004 \003(\0132\020.ColumnRef" +
       "erence\022 \n\030indexedColumnTypeOrdinal\030\005 \003(\005",
@@ -9069,23 +9249,25 @@
       "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" +
       "\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\027\n\017view" +
       "IndexIdType\030\026 \001(\005\022 \n\024indexDataColumnCoun" +
-      "t\030\027 \001(\005:\002-1\022\027\n\017parentTableType\030\030 \001(\t\"\370\001\n" +
-      "\025AddServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014" +
-      "\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cachePtr\030\003 \002(\0132\027.Im" +
-      "mutableBytesWritable\022)\n\014cacheFactory\030\004 \002" +
-      "(\0132\023.ServerCacheFactory\022\017\n\007txState\030\005 \001(\014",
-      "\022\"\n\032hasProtoBufIndexMaintainer\030\006 \001(\010\022\025\n\r" +
-      "clientVersion\030\007 \001(\005\022\032\n\022usePersistentCach" +
-      "e\030\010 \001(\010\"(\n\026AddServerCacheResponse\022\016\n\006ret" +
-      "urn\030\001 \002(\010\"=\n\030RemoveServerCacheRequest\022\020\n" +
-      "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031Remo" +
-      "veServerCacheResponse\022\016\n\006return\030\001 \002(\0102\245\001" +
-      "\n\024ServerCachingService\022A\n\016addServerCache" +
-      "\022\026.AddServerCacheRequest\032\027.AddServerCach" +
-      "eResponse\022J\n\021removeServerCache\022\031.RemoveS" +
-      "erverCacheRequest\032\032.RemoveServerCacheRes",
-      "ponseBG\n(org.apache.phoenix.coprocessor." +
-      "generatedB\023ServerCachingProtosH\001\210\001\001\240\001\001"
+      "t\030\027 \001(\005:\002-1\022\027\n\017parentTableType\030\030 \001(\t\022\032\n\022" +
+      "dataEncodingScheme\030\031 \001(\005\022\"\n\032dataImmutabl" +
+      "eStorageScheme\030\032 \001(\005\"\370\001\n\025AddServerCacheR" +
+      "equest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(" +
+      "\014\022)\n\010cachePtr\030\003 \002(\0132\027.ImmutableBytesWrit",
+      "able\022)\n\014cacheFactory\030\004 \002(\0132\023.ServerCache" +
+      "Factory\022\017\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBufI" +
+      "ndexMaintainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 " +
+      "\001(\005\022\032\n\022usePersistentCache\030\010 \001(\010\"(\n\026AddSe" +
+      "rverCacheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030Rem" +
+      "oveServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014\022" +
+      "\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerCacheRes" +
+      "ponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCachingS" +
+      "ervice\022A\n\016addServerCache\022\026.AddServerCach" +
+      "eRequest\032\027.AddServerCacheResponse\022J\n\021rem",
+      "oveServerCache\022\031.RemoveServerCacheReques" +
+      "t\032\032.RemoveServerCacheResponseBG\n(org.apa" +
+      "che.phoenix.coprocessor.generatedB\023Serve" +
+      "rCachingProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9115,7 +9297,7 @@
           internal_static_IndexMaintainer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_IndexMaintainer_descriptor,
-              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "ImmutableStorageScheme", "ViewIndexIdType", "IndexDataColumnCount", "ParentTableType", });
+              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "ImmutableStorageScheme", "ViewIndexIdType", "IndexDataColumnCount", "ParentTableType", "DataEncodingScheme", "DataImmutableStorageScheme", });
           internal_static_AddServerCacheRequest_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index b882b9c..5ee4ef6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -366,6 +366,8 @@
     /**** START: New member variables added in 4.10 *****/ 
     private QualifierEncodingScheme encodingScheme;
     private ImmutableStorageScheme immutableStorageScheme;
+    private QualifierEncodingScheme dataEncodingScheme;
+    private ImmutableStorageScheme dataImmutableStorageScheme;
     /*
      * Information for columns of data tables that are being indexed. The first part of the pair is column family name
      * and second part is the column name. The reason we need to track this state is because for certain storage schemes
@@ -398,7 +400,9 @@
         // null check for b/w compatibility
         this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
         this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme();
-        
+        this.dataEncodingScheme = dataTable.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : dataTable.getEncodingScheme();
+        this.dataImmutableStorageScheme = dataTable.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : dataTable.getImmutableStorageScheme();
+
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
@@ -598,7 +602,15 @@
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
         initCachedState();
     }
-    
+
+    public void setDataImmutableStorageScheme(ImmutableStorageScheme sc) {
+        this.dataImmutableStorageScheme = sc;
+    }
+
+    public void setDataEncodingScheme(QualifierEncodingScheme sc) {
+        this.dataEncodingScheme = sc;
+    }
+
     public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
@@ -1032,35 +1044,40 @@
                 for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
                     ColumnReference indexColRef = colRefPair.getFirst();
                     ColumnReference dataColRef = colRefPair.getSecond();
-                    Expression expression = new SingleCellColumnExpression(new PDatum() {
-                        @Override
-                        public boolean isNullable() {
-                            return false;
+                    byte[] value = null;
+                    if (this.dataImmutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                        Expression expression = new SingleCellColumnExpression(new PDatum() {
+                            @Override public boolean isNullable() {
+                                return false;
+                            }
+
+                            @Override public SortOrder getSortOrder() {
+                                return null;
+                            }
+
+                            @Override public Integer getScale() {
+                                return null;
+                            }
+
+                            @Override public Integer getMaxLength() {
+                                return null;
+                            }
+
+                            @Override public PDataType getDataType() {
+                                return null;
+                            }
+                        }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme,
+                                immutableStorageScheme);
+                        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                        expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
+                        value = ptr.copyBytesIfNecessary();
+                    } else {
+                        // Data table is ONE_CELL_PER_COLUMN. Get the col value.
+                        ImmutableBytesWritable dataValue = valueGetter.getLatestValue(dataColRef, ts);
+                        if (dataValue != null && dataValue != ValueGetter.HIDDEN_BY_DELETE) {
+                            value = dataValue.copyBytes();
                         }
-                        
-                        @Override
-                        public SortOrder getSortOrder() {
-                            return null;
-                        }
-                        
-                        @Override
-                        public Integer getScale() {
-                            return null;
-                        }
-                        
-                        @Override
-                        public Integer getMaxLength() {
-                            return null;
-                        }
-                        
-                        @Override
-                        public PDataType getDataType() {
-                            return null;
-                        }
-                    }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme, immutableStorageScheme);
-                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-                    expression.evaluate(new ValueGetterTuple(valueGetter, ts), ptr);
-                    byte[] value = ptr.copyBytesIfNecessary();
+                    }
                     if (value != null) {
                         int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
                         colValues[indexArrayPos] = new LiteralExpression(value);
@@ -1378,6 +1395,8 @@
         // Needed for backward compatibility. Clients older than 4.10 will have non-encoded tables.
         this.immutableStorageScheme = ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
         this.encodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+        this.dataImmutableStorageScheme = ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+        this.dataEncodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
         initCachedState();
     }
     
@@ -1437,6 +1456,8 @@
         // proto doesn't support single byte so need an explicit cast here
         maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme());
         maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme());
+        maintainer.dataEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getDataEncodingScheme());
+        maintainer.dataImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getDataImmutableStorageScheme());
         maintainer.isLocalIndex = proto.getIsLocalIndex();
         if (proto.hasParentTableType()) {
             maintainer.parentTableType = PTableType.fromValue(proto.getParentTableType());
@@ -1583,6 +1604,8 @@
         }
         builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue());
         builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue());
+        builder.setDataEncodingScheme(maintainer.dataEncodingScheme.getSerializedMetadataValue());
+        builder.setDataImmutableStorageScheme(maintainer.dataImmutableStorageScheme.getSerializedMetadataValue());
         return builder.build();
     }
 
@@ -1941,5 +1964,11 @@
     public ImmutableStorageScheme getIndexStorageScheme() {
         return immutableStorageScheme;
     }
-    
+    public ImmutableStorageScheme getDataImmutableStorageScheme() {
+        return dataImmutableStorageScheme;
+    }
+
+    public QualifierEncodingScheme getDataEncodingScheme() {
+        return dataEncodingScheme;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 0915da7..8d41132 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -751,12 +751,6 @@
                 .setTransactionProvider(parentTable.getTransactionProvider())
                 .setAutoPartitionSeqName(parentTable.getAutoPartitionSeqName())
                 .setAppendOnlySchema(parentTable.isAppendOnlySchema())
-                .setImmutableStorageScheme(parentTable.getImmutableStorageScheme() == null ?
-                        PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
-                        parentTable.getImmutableStorageScheme())
-                .setQualifierEncodingScheme(parentTable.getEncodingScheme() == null ?
-                        PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
-                        parentTable.getEncodingScheme())
                 .setBaseColumnCount(baseTableColumnCount)
                 .setTimeStamp(maxTableTimestamp)
                 .setExcludedColumns(ImmutableList.copyOf(excludedColumns))
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index 0e37de3..aad6f19 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -65,6 +65,8 @@
   optional int32 viewIndexIdType = 22 ;
   optional int32 indexDataColumnCount = 23 [default = -1];
   optional string parentTableType = 24;
+  optional int32 dataEncodingScheme = 25;
+  optional int32 dataImmutableStorageScheme = 26;
 }
 
 message AddServerCacheRequest {