PHOENIX-5704 Covered column updates are not generated for previously deleted data table row
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 0810aa3..23c1956 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -31,7 +31,6 @@
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.TableName;
@@ -665,7 +664,6 @@
       }
   }
 
-
   @Test
   public void testUpsertingDeletedRowShouldGiveProperDataWithIndexes() throws Exception {
       testUpsertingDeletedRowShouldGiveProperDataWithIndexes(false);
@@ -686,11 +684,11 @@
       try (Connection conn = getConnection()) {
             conn.createStatement().execute(
                 "create table " + fullTableName + " (id integer primary key, "
-                        + (multiCf ? columnFamily1 : "") + "f float, "
-                        + (multiCf ? columnFamily2 : "") + "s varchar)" + tableDDLOptions);
+                        + (multiCf ? columnFamily1 + "." : "") + "f float, "
+                        + (multiCf ? columnFamily2 + "." : "") + "s varchar)" + tableDDLOptions);
             conn.createStatement().execute(
                 "create " + (localIndex ? "LOCAL" : "") + " index " + indexName + " on " + fullTableName + " ("
-                        + (multiCf ? columnFamily1 : "") + "f) include ("+(multiCf ? columnFamily2 : "") +"s)");
+                        + (multiCf ? columnFamily1 + "." : "") + "f) include ("+(multiCf ? columnFamily2 + "." : "") +"s)");
             conn.createStatement().execute(
                 "upsert into " + fullTableName + " values (1, 0.5, 'foo')");
           conn.commit();
@@ -707,6 +705,49 @@
       } 
   }
 
+    @Test
+    public void testUpsertingDeletedRowWithNullCoveredColumn() throws Exception {
+        testUpsertingDeletedRowWithNullCoveredColumn(false);
+    }
+
+    @Test
+    public void testUpsertingDeletedRowWithNullCoveredColumnMultiCfs() throws Exception {
+        testUpsertingDeletedRowWithNullCoveredColumn(true);
+    }
+
+    public void testUpsertingDeletedRowWithNullCoveredColumn(boolean multiCf) throws Exception {
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String columnFamily1 = "cf1";
+        String columnFamily2 = "cf2";
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+        try (Connection conn = getConnection()) {
+            conn.createStatement()
+                    .execute("create table " + fullTableName + " (id integer primary key, "
+                            + (multiCf ? columnFamily1 + "." : "") + "f varchar, "
+                            + (multiCf ? columnFamily2 + "." : "") + "s varchar)"
+                            + tableDDLOptions);
+            conn.createStatement()
+                    .execute("create " + (localIndex ? "LOCAL" : "") + " index " + indexName
+                            + " on " + fullTableName + " (" + (multiCf ? columnFamily1 + "." : "")
+                            + "f) include (" + (multiCf ? columnFamily2 + "." : "") + "s)");
+            conn.createStatement()
+                    .execute("upsert into " + fullTableName + " values (1, 'foo', 'bar')");
+            conn.commit();
+            conn.createStatement().execute("delete from  " + fullTableName + " where id = 1");
+            conn.commit();
+            conn.createStatement()
+                    .execute("upsert into  " + fullTableName + " values (1, null, 'bar')");
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery("select * from " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(2));
+            assertEquals(null, rs.getString(1));
+            assertEquals("bar", rs.getString(3));
+        }
+    }
+
   /**
    * PHOENIX-4988
    * Test updating only a non-indexed column after two successive deletes to an indexed row