HBASE-28456 HBase Restore restores old data if data for the same timestamp is in different hfiles (#5775)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 766a99d..755b0a4 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -99,6 +99,10 @@
     conf.set(FileInputFormat.INPUT_DIR, inputDirs);
     Job job = Job.getInstance(conf,
       conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
+    // when sorting cells in CellSortReducer
+    job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
+      true);
     job.setJarByClass(MapReduceHFileSplitterJob.class);
     job.setInputFormatClass(HFileInputFormat.class);
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
new file mode 100644
index 0000000..d01df68
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
+import static org.apache.hadoop.hbase.backup.BackupType.FULL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
+import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestBackupRestoreWithModifications {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestBackupRestoreWithModifications.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupRestoreWithModifications.class);
+
+  @Parameterized.Parameters(name = "{index}: useBulkLoad={0}")
+  public static Iterable<Object[]> data() {
+    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean useBulkLoad;
+
+  private TableName sourceTable;
+  private TableName targetTable;
+
+  private List<TableName> allTables;
+  private static TestingHBaseCluster cluster;
+  private static final Path BACKUP_ROOT_DIR = new Path("backupIT");
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    enableBackup(conf);
+    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
+    cluster.start();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.stop();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    sourceTable = TableName.valueOf("table-" + useBulkLoad);
+    targetTable = TableName.valueOf("another-table-" + useBulkLoad);
+    allTables = Arrays.asList(sourceTable, targetTable);
+    createTable(sourceTable);
+    createTable(targetTable);
+  }
+
+  @Test
+  public void testModificationsOnTable() throws Exception {
+    Instant timestamp = Instant.now();
+
+    // load some data
+    load(sourceTable, timestamp, "data");
+
+    String backupId = backup(FULL, allTables);
+    BackupInfo backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    restore(backupId, sourceTable, targetTable);
+    validateDataEquals(sourceTable, "data");
+    validateDataEquals(targetTable, "data");
+
+    // load new data on the same timestamp
+    load(sourceTable, timestamp, "changed_data");
+
+    backupId = backup(FULL, allTables);
+    backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    restore(backupId, sourceTable, targetTable);
+    validateDataEquals(sourceTable, "changed_data");
+    validateDataEquals(targetTable, "changed_data");
+  }
+
+  private void createTable(TableName tableName) throws IOException {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      Admin admin = connection.getAdmin()) {
+      admin.createTable(builder.build());
+    }
+  }
+
+  private void load(TableName tableName, Instant timestamp, String data) throws IOException {
+    if (useBulkLoad) {
+      hFileBulkLoad(tableName, timestamp, data);
+    } else {
+      putLoad(tableName, timestamp, data);
+    }
+  }
+
+  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
+    LOG.info("Writing new data to HBase using normal Puts: {}", data);
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
+      Table table = connection.getTable(sourceTable);
+      List<Put> puts = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
+        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
+        puts.add(put);
+
+        if (i % 100 == 0) {
+          table.put(puts);
+          puts.clear();
+        }
+      }
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+      connection.getAdmin().flush(tableName);
+    }
+  }
+
+  private void hFileBulkLoad(TableName tableName, Instant timestamp, String data)
+    throws IOException {
+    FileSystem fs = FileSystem.get(cluster.getConf());
+    LOG.info("Writing new data to HBase using BulkLoad: {}", data);
+    // HFiles require this strict directory structure to allow to load them
+    Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
+    fs.mkdirs(hFileRootPath);
+    Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY));
+    fs.mkdirs(hFileFamilyPath);
+    try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
+      .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
+      .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes())
+        .withColumnFamily(COLUMN_FAMILY).build())
+      .create()) {
+      for (int i = 0; i < 10; i++) {
+        writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"),
+          timestamp.toEpochMilli(), Bytes.toBytes(data)));
+      }
+    }
+    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+      BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath);
+    assertFalse(result.isEmpty());
+  }
+
+  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
+    LOG.info("Creating the backup ...");
+
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      BackupRequest backupRequest =
+        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
+          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
+      return backupAdmin.backupTables(backupRequest);
+    }
+
+  }
+
+  private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
+    throws IOException {
+    LOG.info("Restoring data ...");
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
+        .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true)
+        .withFromTables(new TableName[] { sourceTableName })
+        .withToTables(new TableName[] { targetTableName }).build();
+      backupAdmin.restore(restoreRequest);
+    }
+  }
+
+  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      Table table = connection.getTable(tableName)) {
+      Scan scan = new Scan();
+      scan.readAllVersions();
+      scan.setRaw(true);
+      scan.setBatch(100);
+
+      for (Result sourceResult : table.getScanner(scan)) {
+        List<Cell> sourceCells = sourceResult.listCells();
+        for (Cell cell : sourceCells) {
+          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
+            cell.getValueOffset(), cell.getValueLength()));
+        }
+      }
+    }
+  }
+
+  private BackupInfo verifyBackup(String backupId, BackupType expectedType,
+    BackupInfo.BackupState expectedState) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      BackupInfo backupInfo = backupAdmin.getBackupInfo(backupId);
+
+      // Verify managed backup in HBase
+      assertEquals(backupId, backupInfo.getBackupId());
+      assertEquals(expectedState, backupInfo.getState());
+      assertEquals(expectedType, backupInfo.getType());
+      return backupInfo;
+    }
+  }
+
+  private static void enableBackup(Configuration conf) {
+    // Enable backup
+    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+    BackupManager.decorateMasterConfiguration(conf);
+    BackupManager.decorateRegionServerConfiguration(conf);
+  }
+
+}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
index 3ccbaab..1fdcf4b 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -21,15 +21,18 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -78,6 +81,7 @@
     private Cell value = null;
     private long count;
     private boolean seeked = false;
+    private OptionalLong bulkloadSeqId;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
@@ -88,6 +92,7 @@
       FileSystem fs = path.getFileSystem(conf);
       LOG.info("Initialize HFileRecordReader for {}", path);
       this.in = HFile.createReader(fs, path, conf);
+      this.bulkloadSeqId = StoreFileInfo.getBulkloadSeqId(path);
 
       // The file info must be loaded before the scanner can be used.
       // This seems like a bug in HBase, but it's easily worked around.
@@ -109,6 +114,9 @@
         return false;
       }
       value = scanner.getCell();
+      if (value != null && bulkloadSeqId.isPresent()) {
+        PrivateCellUtil.setSequenceId(value, bulkloadSeqId.getAsLong());
+      }
       count++;
       return true;
     }
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 5c6ef57..fcbcd2d 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -162,10 +162,10 @@
 
   /**
    * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
-   * package-private for internal usage for jobs like WALPlayer which need to use features of
-   * ExtendedCell.
+   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
    */
-  static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
+  @InterfaceAudience.Private
+  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
     "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
   static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 48afdc5..6fccccf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -490,7 +490,7 @@
 
     String name = buildPath.getName();
     if (generateNewName) {
-      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
+      name = generateUniqueName((seqNum < 0) ? null : StoreFileInfo.formatBulkloadSeqId(seqNum));
     }
     Path dstPath = new Path(storeDir, name);
     if (!fs.exists(buildPath)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index ae514f0..5df02bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -329,13 +329,8 @@
 
   @Override
   public boolean isBulkLoadResult() {
-    boolean bulkLoadedHFile = false;
-    String fileName = this.getPath().getName();
-    int startPos = fileName.indexOf("SeqId_");
-    if (startPos != -1) {
-      bulkLoadedHFile = true;
-    }
-    return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
+    return StoreFileInfo.hasBulkloadSeqId(this.getPath())
+      || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
   }
 
   public boolean isCompactedAway() {
@@ -413,19 +408,16 @@
     }
 
     if (isBulkLoadResult()) {
-      // generate the sequenceId from the fileName
-      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
-      String fileName = this.getPath().getName();
-      // Use lastIndexOf() to get the last, most recent bulk load seqId.
-      int startPos = fileName.lastIndexOf("SeqId_");
-      if (startPos != -1) {
-        this.sequenceid =
-          Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6)));
+      // For bulkloads, we have to parse the sequenceid from the path name
+      OptionalLong sequenceId = StoreFileInfo.getBulkloadSeqId(this.getPath());
+      if (sequenceId.isPresent()) {
+        this.sequenceid = sequenceId.getAsLong();
         // Handle reference files as done above.
         if (fileInfo.isTopReference()) {
           this.sequenceid += 1;
         }
       }
+
       // SKIP_RESET_SEQ_ID only works in bulk loaded file.
       // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
       // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 1ebe93d..052dd51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -19,6 +19,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -424,6 +425,54 @@
   }
 
   /**
+   * Cells in a bulkloaded file don't have a sequenceId since they don't go through memstore. When a
+   * bulkload file is committed, the current memstore ts is stamped onto the file name as the
+   * sequenceId of the file. At read time, the sequenceId is copied onto all of the cells returned
+   * so that they can be properly sorted relative to other cells in other files. Further, when
+   * opening multiple files for scan, the sequence id is used to ensusre that the bulkload file's
+   * scanner is porperly sorted amongst the other scanners. Non-bulkloaded files get their
+   * sequenceId from the MAX_MEMSTORE_TS_KEY since those go through the memstore and have true
+   * sequenceIds.
+   */
+  private static final String SEQ_ID_MARKER = "_SeqId_";
+  private static final int SEQ_ID_MARKER_LENGTH = SEQ_ID_MARKER.length();
+
+  /**
+   * @see #SEQ_ID_MARKER
+   * @return True if the file name looks like a bulkloaded file, based on the presence of the SeqId
+   *         marker added to those files.
+   */
+  public static boolean hasBulkloadSeqId(final Path path) {
+    String fileName = path.getName();
+    return fileName.contains(SEQ_ID_MARKER);
+  }
+
+  /**
+   * @see #SEQ_ID_MARKER
+   * @return If the path is a properly named bulkloaded file, returns the sequence id stamped at the
+   *         end of the file name.
+   */
+  public static OptionalLong getBulkloadSeqId(final Path path) {
+    String fileName = path.getName();
+    int startPos = fileName.indexOf(SEQ_ID_MARKER);
+    if (startPos != -1) {
+      String strVal = fileName.substring(startPos + SEQ_ID_MARKER_LENGTH,
+        fileName.indexOf('_', startPos + SEQ_ID_MARKER_LENGTH));
+      return OptionalLong.of(Long.parseLong(strVal));
+    }
+    return OptionalLong.empty();
+  }
+
+  /**
+   * @see #SEQ_ID_MARKER
+   * @return A string value for appending to the end of a bulkloaded file name, containing the
+   *         properly formatted SeqId marker.
+   */
+  public static String formatBulkloadSeqId(long seqId) {
+    return SEQ_ID_MARKER + seqId + "_";
+  }
+
+  /**
    * @param path Path to check.
    * @return True if the path has format of a HFile.
    */