HBASE-28456 HBase Restore restores old data if data for the same timestamp is in different hfiles (#5775)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 766a99d..755b0a4 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -99,6 +99,10 @@
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job = Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+ // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
+ // when sorting cells in CellSortReducer
+ job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
+ true);
job.setJarByClass(MapReduceHFileSplitterJob.class);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
new file mode 100644
index 0000000..d01df68
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupRestoreWithModifications.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
+import static org.apache.hadoop.hbase.backup.BackupType.FULL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
+import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestBackupRestoreWithModifications {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBackupRestoreWithModifications.class);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBackupRestoreWithModifications.class);
+
+ @Parameterized.Parameters(name = "{index}: useBulkLoad={0}")
+ public static Iterable<Object[]> data() {
+ return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
+ }
+
+ @Parameterized.Parameter(0)
+ public boolean useBulkLoad;
+
+ private TableName sourceTable;
+ private TableName targetTable;
+
+ private List<TableName> allTables;
+ private static TestingHBaseCluster cluster;
+ private static final Path BACKUP_ROOT_DIR = new Path("backupIT");
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ enableBackup(conf);
+ cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ cluster.stop();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sourceTable = TableName.valueOf("table-" + useBulkLoad);
+ targetTable = TableName.valueOf("another-table-" + useBulkLoad);
+ allTables = Arrays.asList(sourceTable, targetTable);
+ createTable(sourceTable);
+ createTable(targetTable);
+ }
+
+ @Test
+ public void testModificationsOnTable() throws Exception {
+ Instant timestamp = Instant.now();
+
+ // load some data
+ load(sourceTable, timestamp, "data");
+
+ String backupId = backup(FULL, allTables);
+ BackupInfo backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+ assertTrue(backupInfo.getTables().contains(sourceTable));
+
+ restore(backupId, sourceTable, targetTable);
+ validateDataEquals(sourceTable, "data");
+ validateDataEquals(targetTable, "data");
+
+ // load new data on the same timestamp
+ load(sourceTable, timestamp, "changed_data");
+
+ backupId = backup(FULL, allTables);
+ backupInfo = verifyBackup(backupId, FULL, COMPLETE);
+ assertTrue(backupInfo.getTables().contains(sourceTable));
+
+ restore(backupId, sourceTable, targetTable);
+ validateDataEquals(sourceTable, "changed_data");
+ validateDataEquals(targetTable, "changed_data");
+ }
+
+ private void createTable(TableName tableName) throws IOException {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+ Admin admin = connection.getAdmin()) {
+ admin.createTable(builder.build());
+ }
+ }
+
+ private void load(TableName tableName, Instant timestamp, String data) throws IOException {
+ if (useBulkLoad) {
+ hFileBulkLoad(tableName, timestamp, data);
+ } else {
+ putLoad(tableName, timestamp, data);
+ }
+ }
+
+ private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
+ LOG.info("Writing new data to HBase using normal Puts: {}", data);
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
+ Table table = connection.getTable(sourceTable);
+ List<Put> puts = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
+ put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
+ puts.add(put);
+
+ if (i % 100 == 0) {
+ table.put(puts);
+ puts.clear();
+ }
+ }
+ if (!puts.isEmpty()) {
+ table.put(puts);
+ }
+ connection.getAdmin().flush(tableName);
+ }
+ }
+
+ private void hFileBulkLoad(TableName tableName, Instant timestamp, String data)
+ throws IOException {
+ FileSystem fs = FileSystem.get(cluster.getConf());
+ LOG.info("Writing new data to HBase using BulkLoad: {}", data);
+ // HFiles require this strict directory structure to allow to load them
+ Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
+ fs.mkdirs(hFileRootPath);
+ Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY));
+ fs.mkdirs(hFileFamilyPath);
+ try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
+ .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
+ .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes())
+ .withColumnFamily(COLUMN_FAMILY).build())
+ .create()) {
+ for (int i = 0; i < 10; i++) {
+ writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"),
+ timestamp.toEpochMilli(), Bytes.toBytes(data)));
+ }
+ }
+ Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+ BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath);
+ assertFalse(result.isEmpty());
+ }
+
+ private String backup(BackupType backupType, List<TableName> tables) throws IOException {
+ LOG.info("Creating the backup ...");
+
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+ BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+ BackupRequest backupRequest =
+ new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
+ .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
+ return backupAdmin.backupTables(backupRequest);
+ }
+
+ }
+
+ private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
+ throws IOException {
+ LOG.info("Restoring data ...");
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+ BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+ RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
+ .withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true)
+ .withFromTables(new TableName[] { sourceTableName })
+ .withToTables(new TableName[] { targetTableName }).build();
+ backupAdmin.restore(restoreRequest);
+ }
+ }
+
+ private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+ Table table = connection.getTable(tableName)) {
+ Scan scan = new Scan();
+ scan.readAllVersions();
+ scan.setRaw(true);
+ scan.setBatch(100);
+
+ for (Result sourceResult : table.getScanner(scan)) {
+ List<Cell> sourceCells = sourceResult.listCells();
+ for (Cell cell : sourceCells) {
+ assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+ }
+ }
+ }
+
+ private BackupInfo verifyBackup(String backupId, BackupType expectedType,
+ BackupInfo.BackupState expectedState) throws IOException {
+ try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
+ BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+ BackupInfo backupInfo = backupAdmin.getBackupInfo(backupId);
+
+ // Verify managed backup in HBase
+ assertEquals(backupId, backupInfo.getBackupId());
+ assertEquals(expectedState, backupInfo.getState());
+ assertEquals(expectedType, backupInfo.getType());
+ return backupInfo;
+ }
+ }
+
+ private static void enableBackup(Configuration conf) {
+ // Enable backup
+ conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+ BackupManager.decorateMasterConfiguration(conf);
+ BackupManager.decorateRegionServerConfiguration(conf);
+ }
+
+}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
index 3ccbaab..1fdcf4b 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -21,15 +21,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -78,6 +81,7 @@
private Cell value = null;
private long count;
private boolean seeked = false;
+ private OptionalLong bulkloadSeqId;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
@@ -88,6 +92,7 @@
FileSystem fs = path.getFileSystem(conf);
LOG.info("Initialize HFileRecordReader for {}", path);
this.in = HFile.createReader(fs, path, conf);
+ this.bulkloadSeqId = StoreFileInfo.getBulkloadSeqId(path);
// The file info must be loaded before the scanner can be used.
// This seems like a bug in HBase, but it's easily worked around.
@@ -109,6 +114,9 @@
return false;
}
value = scanner.getCell();
+ if (value != null && bulkloadSeqId.isPresent()) {
+ PrivateCellUtil.setSequenceId(value, bulkloadSeqId.getAsLong());
+ }
count++;
return true;
}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 5c6ef57..fcbcd2d 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -162,10 +162,10 @@
/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
- * package-private for internal usage for jobs like WALPlayer which need to use features of
- * ExtendedCell.
+ * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
*/
- static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
+ @InterfaceAudience.Private
+ public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 48afdc5..6fccccf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -490,7 +490,7 @@
String name = buildPath.getName();
if (generateNewName) {
- name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
+ name = generateUniqueName((seqNum < 0) ? null : StoreFileInfo.formatBulkloadSeqId(seqNum));
}
Path dstPath = new Path(storeDir, name);
if (!fs.exists(buildPath)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index ae514f0..5df02bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -329,13 +329,8 @@
@Override
public boolean isBulkLoadResult() {
- boolean bulkLoadedHFile = false;
- String fileName = this.getPath().getName();
- int startPos = fileName.indexOf("SeqId_");
- if (startPos != -1) {
- bulkLoadedHFile = true;
- }
- return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
+ return StoreFileInfo.hasBulkloadSeqId(this.getPath())
+ || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
}
public boolean isCompactedAway() {
@@ -413,19 +408,16 @@
}
if (isBulkLoadResult()) {
- // generate the sequenceId from the fileName
- // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
- String fileName = this.getPath().getName();
- // Use lastIndexOf() to get the last, most recent bulk load seqId.
- int startPos = fileName.lastIndexOf("SeqId_");
- if (startPos != -1) {
- this.sequenceid =
- Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6)));
+ // For bulkloads, we have to parse the sequenceid from the path name
+ OptionalLong sequenceId = StoreFileInfo.getBulkloadSeqId(this.getPath());
+ if (sequenceId.isPresent()) {
+ this.sequenceid = sequenceId.getAsLong();
// Handle reference files as done above.
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
+
// SKIP_RESET_SEQ_ID only works in bulk loaded file.
// In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 1ebe93d..052dd51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -19,6 +19,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -424,6 +425,54 @@
}
/**
+ * Cells in a bulkloaded file don't have a sequenceId since they don't go through memstore. When a
+ * bulkload file is committed, the current memstore ts is stamped onto the file name as the
+ * sequenceId of the file. At read time, the sequenceId is copied onto all of the cells returned
+ * so that they can be properly sorted relative to other cells in other files. Further, when
+ * opening multiple files for scan, the sequence id is used to ensusre that the bulkload file's
+ * scanner is porperly sorted amongst the other scanners. Non-bulkloaded files get their
+ * sequenceId from the MAX_MEMSTORE_TS_KEY since those go through the memstore and have true
+ * sequenceIds.
+ */
+ private static final String SEQ_ID_MARKER = "_SeqId_";
+ private static final int SEQ_ID_MARKER_LENGTH = SEQ_ID_MARKER.length();
+
+ /**
+ * @see #SEQ_ID_MARKER
+ * @return True if the file name looks like a bulkloaded file, based on the presence of the SeqId
+ * marker added to those files.
+ */
+ public static boolean hasBulkloadSeqId(final Path path) {
+ String fileName = path.getName();
+ return fileName.contains(SEQ_ID_MARKER);
+ }
+
+ /**
+ * @see #SEQ_ID_MARKER
+ * @return If the path is a properly named bulkloaded file, returns the sequence id stamped at the
+ * end of the file name.
+ */
+ public static OptionalLong getBulkloadSeqId(final Path path) {
+ String fileName = path.getName();
+ int startPos = fileName.indexOf(SEQ_ID_MARKER);
+ if (startPos != -1) {
+ String strVal = fileName.substring(startPos + SEQ_ID_MARKER_LENGTH,
+ fileName.indexOf('_', startPos + SEQ_ID_MARKER_LENGTH));
+ return OptionalLong.of(Long.parseLong(strVal));
+ }
+ return OptionalLong.empty();
+ }
+
+ /**
+ * @see #SEQ_ID_MARKER
+ * @return A string value for appending to the end of a bulkloaded file name, containing the
+ * properly formatted SeqId marker.
+ */
+ public static String formatBulkloadSeqId(long seqId) {
+ return SEQ_ID_MARKER + seqId + "_";
+ }
+
+ /**
* @param path Path to check.
* @return True if the path has format of a HFile.
*/