HBASE-28412 Select correct target table for incremental backup restore (#5776)

Contributed-by: Ruben Van Wanzeele <rubenvw@ngdata.com>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 55f6bff..5d654c0 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -74,9 +74,7 @@
         BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
       Configuration conf = getConf();
       conf.set(bulkOutputConfKey, bulkOutputPath.toString());
-      String[] playerArgs = { dirs,
-        fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i].getNameAsString() };
-
+      String[] playerArgs = { dirs, newTableNames[i].getNameAsString() };
       int result;
       try {
 
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
new file mode 100644
index 0000000..3665eeb
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class BackupTestUtil {
+  private BackupTestUtil() {
+  }
+
+  static BackupInfo verifyBackup(Configuration conf, String backupId, BackupType expectedType,
+    BackupInfo.BackupState expectedState) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      BackupInfo backupInfo = backupAdmin.getBackupInfo(backupId);
+
+      // Verify managed backup in HBase
+      assertEquals(backupId, backupInfo.getBackupId());
+      assertEquals(expectedState, backupInfo.getState());
+      assertEquals(expectedType, backupInfo.getType());
+      return backupInfo;
+    }
+  }
+
+  static void enableBackup(Configuration conf) {
+    // Enable backup
+    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+    BackupManager.decorateMasterConfiguration(conf);
+    BackupManager.decorateRegionServerConfiguration(conf);
+  }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreOnEmptyEnvironment.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreOnEmptyEnvironment.java
new file mode 100644
index 0000000..300ca36
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreOnEmptyEnvironment.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
+import static org.apache.hadoop.hbase.backup.BackupType.FULL;
+import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
+import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestBackupRestoreOnEmptyEnvironment {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestBackupRestoreOnEmptyEnvironment.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupRestoreOnEmptyEnvironment.class);
+
+  @Parameterized.Parameters(name = "{index}: restoreToOtherTable={0}")
+  public static Iterable<Object[]> data() {
+    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean restoreToOtherTable;
+  private TableName sourceTable;
+  private TableName targetTable;
+
+  private static TestingHBaseCluster cluster;
+  private static Path BACKUP_ROOT_DIR;
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    enableBackup(conf);
+    cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
+    cluster.start();
+    BACKUP_ROOT_DIR = new Path(new Path(conf.get("fs.defaultFS")), new Path("/backupIT"));
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.stop();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    sourceTable = TableName.valueOf("table");
+    targetTable = TableName.valueOf("another-table");
+    createTable(sourceTable);
+    createTable(targetTable);
+  }
+
+  @After
+  public void removeTables() throws Exception {
+    deleteTables();
+  }
+
+  @Test
+  public void testRestoreToCorrectTable() throws Exception {
+    Instant timestamp = Instant.now().minusSeconds(10);
+
+    // load some data
+    putLoad(sourceTable, timestamp, "data");
+
+    String backupId = backup(FULL, Collections.singletonList(sourceTable));
+    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    LOG.info("Deleting the tables before restore ...");
+    deleteTables();
+
+    if (restoreToOtherTable) {
+      restore(backupId, sourceTable, targetTable);
+      validateDataEquals(targetTable, "data");
+    } else {
+      restore(backupId, sourceTable, sourceTable);
+      validateDataEquals(sourceTable, "data");
+    }
+
+  }
+
+  @Test
+  public void testRestoreCorrectTableForIncremental() throws Exception {
+    Instant timestamp = Instant.now().minusSeconds(10);
+
+    // load some data
+    putLoad(sourceTable, timestamp, "data");
+
+    String backupId = backup(FULL, Collections.singletonList(sourceTable));
+    verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
+
+    // some incremental data
+    putLoad(sourceTable, timestamp.plusMillis(1), "new_data");
+
+    String backupId2 = backup(INCREMENTAL, Collections.singletonList(sourceTable));
+    verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE);
+
+    LOG.info("Deleting the tables before restore ...");
+    deleteTables();
+
+    if (restoreToOtherTable) {
+      restore(backupId2, sourceTable, targetTable);
+      validateDataEquals(targetTable, "new_data");
+    } else {
+      restore(backupId2, sourceTable, sourceTable);
+      validateDataEquals(sourceTable, "new_data");
+    }
+
+  }
+
+  private void createTable(TableName tableName) throws IOException {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      Admin admin = connection.getAdmin()) {
+      admin.createTable(builder.build());
+    }
+  }
+
+  private void deleteTables() throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      Admin admin = connection.getAdmin()) {
+      for (TableName table : Arrays.asList(sourceTable, targetTable)) {
+        if (admin.tableExists(table)) {
+          admin.disableTable(table);
+          admin.deleteTable(table);
+        }
+      }
+    }
+  }
+
+  private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
+    LOG.info("Writing new data to HBase using normal Puts: {}", data);
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
+      Table table = connection.getTable(sourceTable);
+      List<Put> puts = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
+        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
+        puts.add(put);
+
+        if (i % 100 == 0) {
+          table.put(puts);
+          puts.clear();
+        }
+      }
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+      connection.getAdmin().flush(tableName);
+    }
+  }
+
+  private String backup(BackupType backupType, List<TableName> tables) throws IOException {
+    LOG.info("Creating the backup ...");
+
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      BackupRequest backupRequest =
+        new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
+          .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
+      return backupAdmin.backupTables(backupRequest);
+    }
+
+  }
+
+  private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
+    throws IOException {
+    LOG.info("Restoring data ...");
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
+        .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true)
+        .withFromTables(new TableName[] { sourceTableName })
+        .withToTables(new TableName[] { targetTableName }).build();
+      backupAdmin.restore(restoreRequest);
+    }
+  }
+
+  private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+      Table table = connection.getTable(tableName)) {
+      Scan scan = new Scan();
+      scan.setRaw(true);
+      scan.setBatch(100);
+
+      for (Result sourceResult : table.getScanner(scan)) {
+        List<Cell> sourceCells = sourceResult.listCells();
+        for (Cell cell : sourceCells) {
+          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
+            cell.getValueOffset(), cell.getValueLength()));
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
index d01df68..62ba500 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.backup;
 
 import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
 import static org.apache.hadoop.hbase.backup.BackupType.FULL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,7 +43,6 @@
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
@@ -126,7 +127,7 @@
     load(sourceTable, timestamp, "data");
 
     String backupId = backup(FULL, allTables);
-    BackupInfo backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
     assertTrue(backupInfo.getTables().contains(sourceTable));
 
     restore(backupId, sourceTable, targetTable);
@@ -137,7 +138,7 @@
     load(sourceTable, timestamp, "changed_data");
 
     backupId = backup(FULL, allTables);
-    backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+    backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE);
     assertTrue(backupInfo.getTables().contains(sourceTable));
 
     restore(backupId, sourceTable, targetTable);
@@ -252,25 +253,4 @@
     }
   }
 
-  private BackupInfo verifyBackup(String backupId, BackupType expectedType,
-    BackupInfo.BackupState expectedState) throws IOException {
-    try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
-      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
-      BackupInfo backupInfo = backupAdmin.getBackupInfo(backupId);
-
-      // Verify managed backup in HBase
-      assertEquals(backupId, backupInfo.getBackupId());
-      assertEquals(expectedState, backupInfo.getState());
-      assertEquals(expectedType, backupInfo.getType());
-      return backupInfo;
-    }
-  }
-
-  private static void enableBackup(Configuration conf) {
-    // Enable backup
-    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
-    BackupManager.decorateMasterConfiguration(conf);
-    BackupManager.decorateRegionServerConfiguration(conf);
-  }
-
 }