PHOENIX-5580 Wrong values seen when updating a view for a table that has an index

Signed-off-by: Abhishek Singh Chouhan <achouhan@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 23c1956..0071392 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -195,6 +195,39 @@
     }
     
     @Test
+    public void testUpsertIntoViewOnTableWithIndex() throws Exception {
+        String baseTable = generateUniqueName();
+        String view = generateUniqueName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE IF NOT EXISTS " + baseTable + 
+                    " (ID VARCHAR PRIMARY KEY, V1 VARCHAR)";
+            conn.createStatement().execute(baseTableDDL);
+
+            // Create an Index on the base table
+            String tableIndex = generateUniqueName() + "_IDX";
+            conn.createStatement().execute("CREATE INDEX " + tableIndex + 
+                " ON " + baseTable + " (V1)");
+
+            // Create a view on the base table
+            String viewDDL = "CREATE VIEW IF NOT EXISTS " + view 
+                    + " (V2 INTEGER) AS SELECT * FROM " + baseTable
+                    + " WHERE ID='a'";
+            conn.createStatement().execute(viewDDL);
+
+            String upsert = "UPSERT INTO " + view + " (ID, V1, V2) "
+                    + "VALUES ('a' ,'ab', 7)";
+            conn.createStatement().executeUpdate(upsert);
+            conn.commit();
+            
+            ResultSet rs = conn.createStatement().executeQuery("SELECT ID, V1 from " + baseTable);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("ab", rs.getString(2));         
+        }
+    }
+    
+    @Test
     public void testCoveredColumns() throws Exception {
         String tableName = "TBL_" + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index 3fd01a2..ab1ffee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -2177,6 +2177,21 @@
      * <code>optional int32 indexDataColumnCount = 23 [default = -1];</code>
      */
     int getIndexDataColumnCount();
+
+    // optional string parentTableType = 24;
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    boolean hasParentTableType();
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    java.lang.String getParentTableType();
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    com.google.protobuf.ByteString
+        getParentTableTypeBytes();
   }
   /**
    * Protobuf type {@code IndexMaintainer}
@@ -2380,6 +2395,11 @@
               indexDataColumnCount_ = input.readInt32();
               break;
             }
+            case 194: {
+              bitField0_ |= 0x00040000;
+              parentTableType_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2896,6 +2916,49 @@
       return indexDataColumnCount_;
     }
 
+    // optional string parentTableType = 24;
+    public static final int PARENTTABLETYPE_FIELD_NUMBER = 24;
+    private java.lang.Object parentTableType_;
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    public boolean hasParentTableType() {
+      return ((bitField0_ & 0x00040000) == 0x00040000);
+    }
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    public java.lang.String getParentTableType() {
+      java.lang.Object ref = parentTableType_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          parentTableType_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string parentTableType = 24;</code>
+     */
+    public com.google.protobuf.ByteString
+        getParentTableTypeBytes() {
+      java.lang.Object ref = parentTableType_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        parentTableType_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       saltBuckets_ = 0;
       isMultiTenant_ = false;
@@ -2920,6 +2983,7 @@
       immutableStorageScheme_ = 0;
       viewIndexIdType_ = 0;
       indexDataColumnCount_ = -1;
+      parentTableType_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3086,6 +3150,9 @@
       if (((bitField0_ & 0x00020000) == 0x00020000)) {
         output.writeInt32(23, indexDataColumnCount_);
       }
+      if (((bitField0_ & 0x00040000) == 0x00040000)) {
+        output.writeBytes(24, getParentTableTypeBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3192,6 +3259,10 @@
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(23, indexDataColumnCount_);
       }
+      if (((bitField0_ & 0x00040000) == 0x00040000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(24, getParentTableTypeBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3315,6 +3386,11 @@
         result = result && (getIndexDataColumnCount()
             == other.getIndexDataColumnCount());
       }
+      result = result && (hasParentTableType() == other.hasParentTableType());
+      if (hasParentTableType()) {
+        result = result && getParentTableType()
+            .equals(other.getParentTableType());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -3420,6 +3496,10 @@
         hash = (37 * hash) + INDEXDATACOLUMNCOUNT_FIELD_NUMBER;
         hash = (53 * hash) + getIndexDataColumnCount();
       }
+      if (hasParentTableType()) {
+        hash = (37 * hash) + PARENTTABLETYPE_FIELD_NUMBER;
+        hash = (53 * hash) + getParentTableType().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -3600,6 +3680,8 @@
         bitField0_ = (bitField0_ & ~0x00200000);
         indexDataColumnCount_ = -1;
         bitField0_ = (bitField0_ & ~0x00400000);
+        parentTableType_ = "";
+        bitField0_ = (bitField0_ & ~0x00800000);
         return this;
       }
 
@@ -3745,6 +3827,10 @@
           to_bitField0_ |= 0x00020000;
         }
         result.indexDataColumnCount_ = indexDataColumnCount_;
+        if (((from_bitField0_ & 0x00800000) == 0x00800000)) {
+          to_bitField0_ |= 0x00040000;
+        }
+        result.parentTableType_ = parentTableType_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3929,6 +4015,11 @@
         if (other.hasIndexDataColumnCount()) {
           setIndexDataColumnCount(other.getIndexDataColumnCount());
         }
+        if (other.hasParentTableType()) {
+          bitField0_ |= 0x00800000;
+          parentTableType_ = other.parentTableType_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5759,6 +5850,80 @@
         return this;
       }
 
+      // optional string parentTableType = 24;
+      private java.lang.Object parentTableType_ = "";
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public boolean hasParentTableType() {
+        return ((bitField0_ & 0x00800000) == 0x00800000);
+      }
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public java.lang.String getParentTableType() {
+        java.lang.Object ref = parentTableType_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          parentTableType_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public com.google.protobuf.ByteString
+          getParentTableTypeBytes() {
+        java.lang.Object ref = parentTableType_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          parentTableType_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public Builder setParentTableType(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00800000;
+        parentTableType_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public Builder clearParentTableType() {
+        bitField0_ = (bitField0_ & ~0x00800000);
+        parentTableType_ = getDefaultInstance().getParentTableType();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string parentTableType = 24;</code>
+       */
+      public Builder setParentTableTypeBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00800000;
+        parentTableType_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:IndexMaintainer)
     }
 
@@ -8885,7 +9050,7 @@
       "ength\030\003 \002(\005\"4\n\017ColumnReference\022\016\n\006family" +
       "\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\"4\n\nColumnInfo\022" +
       "\022\n\nfamilyName\030\001 \001(\t\022\022\n\ncolumnName\030\002 \002(\t\"" +
-      "\201\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
+      "\232\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
       "\022\025\n\risMultiTenant\030\002 \002(\010\022\023\n\013viewIndexId\030\003" +
       " \001(\014\022(\n\016indexedColumns\030\004 \003(\0132\020.ColumnRef" +
       "erence\022 \n\030indexedColumnTypeOrdinal\030\005 \003(\005",
@@ -8904,23 +9069,23 @@
       "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" +
       "\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\027\n\017view" +
       "IndexIdType\030\026 \001(\005\022 \n\024indexDataColumnCoun" +
-      "t\030\027 \001(\005:\002-1\"\370\001\n\025AddServerCacheRequest\022\020\n" +
-      "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cach" +
-      "ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" +
-      "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" +
-      "\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBufIndexMaint",
-      "ainer\030\006 \001(\010\022\025\n\rclientVersion\030\007 \001(\005\022\032\n\022us" +
-      "ePersistentCache\030\010 \001(\010\"(\n\026AddServerCache" +
-      "Response\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServer" +
-      "CacheRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheI" +
-      "d\030\002 \002(\014\"+\n\031RemoveServerCacheResponse\022\016\n\006" +
-      "return\030\001 \002(\0102\245\001\n\024ServerCachingService\022A\n" +
-      "\016addServerCache\022\026.AddServerCacheRequest\032" +
-      "\027.AddServerCacheResponse\022J\n\021removeServer" +
-      "Cache\022\031.RemoveServerCacheRequest\032\032.Remov" +
-      "eServerCacheResponseBG\n(org.apache.phoen",
-      "ix.coprocessor.generatedB\023ServerCachingP" +
-      "rotosH\001\210\001\001\240\001\001"
+      "t\030\027 \001(\005:\002-1\022\027\n\017parentTableType\030\030 \001(\t\"\370\001\n" +
+      "\025AddServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014" +
+      "\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cachePtr\030\003 \002(\0132\027.Im" +
+      "mutableBytesWritable\022)\n\014cacheFactory\030\004 \002" +
+      "(\0132\023.ServerCacheFactory\022\017\n\007txState\030\005 \001(\014",
+      "\022\"\n\032hasProtoBufIndexMaintainer\030\006 \001(\010\022\025\n\r" +
+      "clientVersion\030\007 \001(\005\022\032\n\022usePersistentCach" +
+      "e\030\010 \001(\010\"(\n\026AddServerCacheResponse\022\016\n\006ret" +
+      "urn\030\001 \002(\010\"=\n\030RemoveServerCacheRequest\022\020\n" +
+      "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031Remo" +
+      "veServerCacheResponse\022\016\n\006return\030\001 \002(\0102\245\001" +
+      "\n\024ServerCachingService\022A\n\016addServerCache" +
+      "\022\026.AddServerCacheRequest\032\027.AddServerCach" +
+      "eResponse\022J\n\021removeServerCache\022\031.RemoveS" +
+      "erverCacheRequest\032\032.RemoveServerCacheRes",
+      "ponseBG\n(org.apache.phoenix.coprocessor." +
+      "generatedB\023ServerCachingProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8950,7 +9115,7 @@
           internal_static_IndexMaintainer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_IndexMaintainer_descriptor,
-              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "ImmutableStorageScheme", "ViewIndexIdType", "IndexDataColumnCount", });
+              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "ImmutableStorageScheme", "ViewIndexIdType", "IndexDataColumnCount", "ParentTableType", });
           internal_static_AddServerCacheRequest_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 7004c81..a39c859 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -332,6 +332,7 @@
     private byte[] viewIndexId;
     private PDataType viewIndexIdType;
     private boolean isMultiTenant;
+    private PTableType parentTableType;
     // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
     private List<Expression> indexedExpressions;
     // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
@@ -393,7 +394,7 @@
         this.viewIndexIdType = index.getviewIndexIdType();
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
         this.encodingScheme = index.getEncodingScheme();
-        
+      
         // null check for b/w compatibility
         this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
         this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme();
@@ -433,18 +434,20 @@
         // either a VIEW or PROJECTED
         List<PColumn>dataPKColumns = dataTable.getPKColumns();
         this.indexDataColumnCount = dataPKColumns.size();
+        PTable parentTable = dataTable;
         // We need to get the PK column for the table on which the index is created
         if (!dataTable.getName().equals(index.getParentName())) {
             try {
                 String tenantId = (index.getTenantId() != null) ? 
                         index.getTenantId().getString() : null;
-                PTable indexTable = PhoenixRuntime.getTable(connection, 
+                parentTable = PhoenixRuntime.getTable(connection, 
                         tenantId, index.getParentName().getString());
-                this.indexDataColumnCount = indexTable.getPKColumns().size();
+                this.indexDataColumnCount = parentTable.getPKColumns().size();
             } catch (SQLException e) {
                 throw new RuntimeException(e);
             }
         }
+        this.parentTableType = parentTable.getType();
         
         int indexPkColumnCount = this.indexDataColumnCount + 
                 indexedExpressionCount  - (this.isDataTableSalted ? 1 : 0) - (this.isMultiTenant ? 1 : 0);
@@ -644,7 +647,7 @@
                 // Ignore view constants from the data table, as these
                 // don't need to appear in the index (as they're the
                 // same for all rows in this index)
-                if (!viewConstantColumnBitSet.get(i)) {
+                if (!viewConstantColumnBitSet.get(i) || isIndexOnBaseTable()) {
                     int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset);
                     if (Boolean.TRUE.equals(hasValue)) {
                         dataRowKeyLocator[0][pos] = ptr.getOffset();
@@ -959,6 +962,9 @@
     }
     
     private int getNumViewConstants() {
+        if (isIndexOnBaseTable()) {
+            return 0;
+        }
         BitSet bitSet = this.rowKeyMetaData.getViewConstantColumnBitSet();
         int num = 0;
         for(int i = 0; i<dataRowKeySchema.getFieldCount();i++){
@@ -1432,6 +1438,9 @@
         maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme());
         maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme());
         maintainer.isLocalIndex = proto.getIsLocalIndex();
+        if (proto.hasParentTableType()) {
+            maintainer.parentTableType = PTableType.fromValue(proto.getParentTableType());
+        }
         
         List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList();
         List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList();
@@ -1535,6 +1544,9 @@
             }
         }
         builder.setIsLocalIndex(maintainer.isLocalIndex);
+        if (maintainer.parentTableType != null) {
+            builder.setParentTableType(maintainer.parentTableType.toString());
+        }       
         builder.setIndexDataColumnCount(maintainer.indexDataColumnCount);
         builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName));
         builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable);
@@ -1652,7 +1664,7 @@
         int numViewConstantColumns = 0;
         BitSet viewConstantColumnBitSet = rowKeyMetaData.getViewConstantColumnBitSet();
         for (int i = dataPkOffset; i < indexDataColumnCount; i++) {
-            if (!viewConstantColumnBitSet.get(i)) {
+            if (!viewConstantColumnBitSet.get(i) || isIndexOnBaseTable()) {
                 int indexPkPosition = rowKeyMetaData.getIndexPkPosition(i-dataPkOffset);
                 this.dataPkPosition[indexPkPosition] = i;
             } else {
@@ -1887,6 +1899,13 @@
         return immutableRows;
     }
     
+    public boolean isIndexOnBaseTable() {
+        if (parentTableType == null) {
+            return false;
+        }
+        return parentTableType == PTableType.TABLE;
+    }
+    
     public Set<ColumnReference> getIndexedColumns() {
         return indexedColumns;
     }
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index fbe151a..0e37de3 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -64,6 +64,7 @@
   required int32 immutableStorageScheme = 21;
   optional int32 viewIndexIdType = 22 ;
   optional int32 indexDataColumnCount = 23 [default = -1];
+  optional string parentTableType = 24;
 }
 
 message AddServerCacheRequest {