HBASE-16286 When TagRewriteCell are not copied to MSLAB, deep clone it while adding to Memstore.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 9162962..b769f19 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * Utility methods helpful slinging {@link Cell} instances.
@@ -326,6 +327,194 @@
   }
 
   /**
+   * @return A new cell which is having the extra tags also added to it.
+   */
+  public static Cell createCell(Cell cell, List<Tag> tags) {
+    return createCell(cell, TagUtil.fromList(tags));
+  }
+
+  /**
+   * @return A new cell which is having the extra tags also added to it.
+   */
+  public static Cell createCell(Cell cell, byte[] tags) {
+    if (cell instanceof ShareableMemory) {
+      return new ShareableMemoryTagRewriteCell(cell, tags);
+    }
+    return new TagRewriteCell(cell, tags);
+  }
+
+  /**
+   * This can be used when a Cell has to change with addition/removal of one or more tags. This is an
+   * efficient way to do so in which only the tags bytes part need to recreated and copied. All other
+   * parts, refer to the original Cell.
+   */
+  @InterfaceAudience.Private
+  private static class TagRewriteCell implements Cell, SettableSequenceId, SettableTimestamp,
+      HeapSize {
+    protected Cell cell;
+    protected byte[] tags;
+
+    /**
+     * @param cell The original Cell which it rewrites
+     * @param tags the tags bytes. The array suppose to contain the tags bytes alone.
+     */
+    public TagRewriteCell(Cell cell, byte[] tags) {
+      assert cell instanceof SettableSequenceId;
+      assert cell instanceof SettableTimestamp;
+      assert tags != null;
+      this.cell = cell;
+      this.tags = tags;
+      // tag offset will be treated as 0 and length this.tags.length
+      if (this.cell instanceof TagRewriteCell) {
+        // Cleaning the ref so that the byte[] can be GCed
+        ((TagRewriteCell) this.cell).tags = null;
+      }
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return cell.getRowArray();
+    }
+
+    @Override
+    public int getRowOffset() {
+      return cell.getRowOffset();
+    }
+
+    @Override
+    public short getRowLength() {
+      return cell.getRowLength();
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return cell.getFamilyArray();
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return cell.getFamilyOffset();
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return cell.getFamilyLength();
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return cell.getQualifierArray();
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return cell.getQualifierOffset();
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return cell.getQualifierLength();
+    }
+
+    @Override
+    public long getTimestamp() {
+      return cell.getTimestamp();
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return cell.getTypeByte();
+    }
+
+    @Override
+    public long getSequenceId() {
+      return cell.getSequenceId();
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return cell.getValueArray();
+    }
+
+    @Override
+    public int getValueOffset() {
+      return cell.getValueOffset();
+    }
+
+    @Override
+    public int getValueLength() {
+      return cell.getValueLength();
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return this.tags;
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getTagsLength() {
+      if (null == this.tags) {
+        // Nulled out tags array optimization in constructor
+        return 0;
+      }
+      return this.tags.length;
+    }
+
+    @Override
+    public long heapSize() {
+      long sum = CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
+      sum += ClassSize.OBJECT;// this object itself
+      sum += (2 * ClassSize.REFERENCE);// pointers to cell and tags array
+      if (this.tags != null) {
+        sum += ClassSize.align(ClassSize.ARRAY);// "tags"
+        sum += this.tags.length;
+      }
+      return sum;
+    }
+
+    @Override
+    public void setTimestamp(long ts) throws IOException {
+      // The incoming cell is supposed to be SettableTimestamp type.
+      CellUtil.setTimestamp(cell, ts);
+    }
+
+    @Override
+    public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
+      // The incoming cell is supposed to be SettableTimestamp type.
+      CellUtil.setTimestamp(cell, ts, tsOffset);
+    }
+
+    @Override
+    public void setSequenceId(long seqId) throws IOException {
+      // The incoming cell is supposed to be SettableSequenceId type.
+      CellUtil.setSequenceId(cell, seqId);
+    }
+  }
+
+  /**
+   * Version of TagRewriteCell where the original Cell is ShareableMemory type.
+   */
+  private static class ShareableMemoryTagRewriteCell extends TagRewriteCell implements
+      ShareableMemory {
+
+    public ShareableMemoryTagRewriteCell(Cell cell, byte[] tags) {
+      super(cell, tags);
+      assert cell instanceof ShareableMemory;
+    }
+
+    @Override
+    public Cell cloneToCell() {
+      Cell clonedBaseCell = ((ShareableMemory) this.cell).cloneToCell();
+      return new TagRewriteCell(clonedBaseCell, this.tags);
+    }
+  }
+
+  /**
    * @param cellScannerables
    * @return CellScanner interface over <code>cellIterables</code>
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/TagRewriteCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/TagRewriteCell.java
deleted file mode 100644
index 3ada51a..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/TagRewriteCell.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.util.ClassSize;
-
-/**
- * This can be used when a Cell has to change with addition/removal of one or more tags. This is an
- * efficient way to do so in which only the tags bytes part need to recreated and copied. All other
- * parts, refer to the original Cell.
- */
-@InterfaceAudience.Private
-public class TagRewriteCell implements Cell, SettableSequenceId, SettableTimestamp, HeapSize {
-
-  private Cell cell;
-  private byte[] tags;
-
-  /**
-   * @param cell The original Cell which it rewrites
-   * @param tags the tags bytes. The array suppose to contain the tags bytes alone.
-   */
-  public TagRewriteCell(Cell cell, byte[] tags) {
-    assert cell instanceof SettableSequenceId;
-    assert cell instanceof SettableTimestamp;
-    assert tags != null;
-    this.cell = cell;
-    this.tags = tags;
-    // tag offset will be treated as 0 and length this.tags.length
-    if (this.cell instanceof TagRewriteCell) {
-      // Cleaning the ref so that the byte[] can be GCed
-      ((TagRewriteCell) this.cell).tags = null;
-    }
-  }
-
-  @Override
-  public byte[] getRowArray() {
-    return cell.getRowArray();
-  }
-
-  @Override
-  public int getRowOffset() {
-    return cell.getRowOffset();
-  }
-
-  @Override
-  public short getRowLength() {
-    return cell.getRowLength();
-  }
-
-  @Override
-  public byte[] getFamilyArray() {
-    return cell.getFamilyArray();
-  }
-
-  @Override
-  public int getFamilyOffset() {
-    return cell.getFamilyOffset();
-  }
-
-  @Override
-  public byte getFamilyLength() {
-    return cell.getFamilyLength();
-  }
-
-  @Override
-  public byte[] getQualifierArray() {
-    return cell.getQualifierArray();
-  }
-
-  @Override
-  public int getQualifierOffset() {
-    return cell.getQualifierOffset();
-  }
-
-  @Override
-  public int getQualifierLength() {
-    return cell.getQualifierLength();
-  }
-
-  @Override
-  public long getTimestamp() {
-    return cell.getTimestamp();
-  }
-
-  @Override
-  public byte getTypeByte() {
-    return cell.getTypeByte();
-  }
-
-  @Override
-  public long getSequenceId() {
-    return cell.getSequenceId();
-  }
-
-  @Override
-  public byte[] getValueArray() {
-    return cell.getValueArray();
-  }
-
-  @Override
-  public int getValueOffset() {
-    return cell.getValueOffset();
-  }
-
-  @Override
-  public int getValueLength() {
-    return cell.getValueLength();
-  }
-
-  @Override
-  public byte[] getTagsArray() {
-    return this.tags;
-  }
-
-  @Override
-  public int getTagsOffset() {
-    return 0;
-  }
-
-  @Override
-  public int getTagsLength() {
-    if (null == this.tags) {
-      // Nulled out tags array optimization in constructor
-      return 0;
-    }
-    return this.tags.length;
-  }
-
-  @Override
-  public long heapSize() {
-    long sum = CellUtil.estimatedHeapSizeOf(cell) - cell.getTagsLength();
-    sum += ClassSize.OBJECT;// this object itself
-    sum += (2 * ClassSize.REFERENCE);// pointers to cell and tags array
-    if (this.tags != null) {
-      sum += ClassSize.align(ClassSize.ARRAY);// "tags"
-      sum += this.tags.length;
-    }
-    return sum;
-  }
-
-  @Override
-  public void setTimestamp(long ts) throws IOException {
-    // The incoming cell is supposed to be SettableTimestamp type.
-    CellUtil.setTimestamp(cell, ts);
-  }
-
-  @Override
-  public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
-    // The incoming cell is supposed to be SettableTimestamp type.
-    CellUtil.setTimestamp(cell, ts, tsOffset);
-  }
-
-  @Override
-  public void setSequenceId(long seqId) throws IOException {
-    // The incoming cell is supposed to be SettableSequenceId type.
-    CellUtil.setSequenceId(cell, seqId);
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9f6a03a..89e723e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -104,7 +104,6 @@
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
@@ -3699,7 +3698,7 @@
         List<Tag> newTags = TagUtil.carryForwardTags(null, cell);
         newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL());
         // Rewrite the cell with the updated set of tags
-        cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags)));
+        cells.set(i, CellUtil.createCell(cell, newTags));
       }
     }
   }
@@ -7437,7 +7436,7 @@
       newCell = delta;
       tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
       if (tags != null) {
-        newCell = new TagRewriteCell(delta, TagUtil.fromList(tags));
+        newCell = CellUtil.createCell(delta, tags);
       }
     }
     return newCell;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index cb78837..0e69060 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -57,7 +57,6 @@
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
@@ -887,7 +886,7 @@
         while (tagIterator.hasNext()) {
           tags.add(tagIterator.next());
         }
-        newCells.add(new TagRewriteCell(cell, TagUtil.fromList(tags)));
+        newCells.add(CellUtil.createCell(cell, tags));
       }
       // This is supposed to be safe, won't CME
       e.setValue(newCells);
@@ -2065,7 +2064,7 @@
       return newCell;
     }
 
-    Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
+    Cell rewriteCell = CellUtil.createCell(newCell, tags);
     return rewriteCell;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 3fa3fa3..f7bcef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -47,7 +47,6 @@
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -388,7 +387,7 @@
                 removeReplicationVisibilityTag(tags);
               }
               tags.addAll(visibilityTags);
-              Cell updatedCell = new TagRewriteCell(cell, TagUtil.fromList(tags));
+              Cell updatedCell = CellUtil.createCell(cell, tags);
               updatedCells.add(updatedCell);
             }
             m.getFamilyCellMap().clear();
@@ -753,7 +752,7 @@
       }
     }
 
-    Cell rewriteCell = new TagRewriteCell(newCell, TagUtil.fromList(tags));
+    Cell rewriteCell = CellUtil.createCell(newCell, tags);
     return rewriteCell;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index 2ac515a..c1c3852 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -26,10 +26,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
 import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -101,7 +100,7 @@
                 continue;
               }
               // Recreate the cell with the new tags and the existing tags
-              Cell newCell = new TagRewriteCell(cell, TagUtil.fromList(nonVisTags));
+              Cell newCell = CellUtil.createCell(cell, nonVisTags);
               newEdit.add(newCell);
             } else {
               newEdit.add(cell);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
index 56cecf4..54ae775 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
@@ -18,6 +18,7 @@
 
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
@@ -30,19 +31,19 @@
   public void testHeapSize() {
     Cell originalCell = CellUtil.createCell(Bytes.toBytes("row"), Bytes.toBytes("value"));
     final int fakeTagArrayLength = 10;
-    TagRewriteCell trCell = new TagRewriteCell(originalCell, new byte[fakeTagArrayLength]);
+    Cell trCell = CellUtil.createCell(originalCell, new byte[fakeTagArrayLength]);
 
     // Get the heapSize before the internal tags array in trCell are nuked
-    long trCellHeapSize = trCell.heapSize();
+    long trCellHeapSize = ((HeapSize)trCell).heapSize();
 
     // Make another TagRewriteCell with the original TagRewriteCell
     // This happens on systems with more than one RegionObserver/Coproc loaded (such as
     // VisibilityController and AccessController)
-    TagRewriteCell trCell2 = new TagRewriteCell(trCell, new byte[fakeTagArrayLength]);
+    Cell trCell2 = CellUtil.createCell(trCell, new byte[fakeTagArrayLength]);
 
     assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be larger than a " +
-        "single TagRewriteCell's heapsize", trCellHeapSize < trCell2.heapSize());
-    assertTrue("TagRewriteCell should have had nulled out tags array", trCell.heapSize() <
+        "single TagRewriteCell's heapsize", trCellHeapSize < ((HeapSize)trCell2).heapSize());
+    assertTrue("TagRewriteCell should have had nulled out tags array", ((HeapSize)trCell).heapSize() <
         trCellHeapSize);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 346347e..65a20ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -44,7 +44,6 @@
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.TagRewriteCell;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.client.Admin;
@@ -413,8 +412,7 @@
             List<Tag> tagList = new ArrayList<Tag>();
             tagList.add(tag);
             tagList.addAll(kv.getTags());
-            byte[] fromList = TagUtil.fromList(tagList);
-            TagRewriteCell newcell = new TagRewriteCell(kv, fromList);
+            Cell newcell = CellUtil.createCell(kv, tagList);
             ((List<Cell>) updatedCells).add(newcell);
           }
         }