| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase; |
| |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.backup.BackupAdmin; |
| import org.apache.hadoop.hbase.backup.BackupInfo; |
| import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; |
| import org.apache.hadoop.hbase.backup.BackupRequest; |
| import org.apache.hadoop.hbase.backup.BackupRestoreConstants; |
| import org.apache.hadoop.hbase.backup.BackupType; |
| import org.apache.hadoop.hbase.backup.RestoreRequest; |
| import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; |
| import org.apache.hadoop.hbase.backup.impl.BackupManager; |
| import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; |
| import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; |
| import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; |
| import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; |
| import org.apache.hadoop.hbase.chaos.policies.Policy; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.testclassification.IntegrationTests; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; |
| import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * An integration test to detect regressions in HBASE-7912. Create |
| * a table with many regions, load data, perform series backup/load operations, |
| * then restore and verify data |
| * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a> |
| * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a> |
| */ |
| @Category(IntegrationTests.class) |
| public class IntegrationTestBackupRestore extends IntegrationTestBase { |
| private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); |
| protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); |
| protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; |
| protected static final String COLUMN_NAME = "f"; |
| protected static final String REGION_COUNT_KEY = "regions_per_rs"; |
| protected static final String REGIONSERVER_COUNT_KEY = "region_servers"; |
| protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration"; |
| protected static final String NUM_ITERATIONS_KEY = "num_iterations"; |
| protected static final int DEFAULT_REGION_COUNT = 10; |
| protected static final int DEFAULT_REGIONSERVER_COUNT = 5; |
| protected static final int DEFAULT_NUMBER_OF_TABLES = 1; |
| protected static final int DEFAULT_NUM_ITERATIONS = 10; |
| protected static final int DEFAULT_ROWS_IN_ITERATION = 500000; |
| protected static final String SLEEP_TIME_KEY = "sleeptime"; |
| // short default interval because tests don't run very long. |
| protected static final long SLEEP_TIME_DEFAULT = 50000L; |
| |
| protected static int rowsInIteration; |
| protected static int regionsCountPerServer; |
| protected static int regionServerCount; |
| |
| protected static int numIterations; |
| protected static int numTables; |
| protected static TableName[] tableNames; |
| protected long sleepTime; |
| protected static Object lock = new Object(); |
| |
| private static String BACKUP_ROOT_DIR = "backupIT"; |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| util = new IntegrationTestingUtility(); |
| Configuration conf = util.getConfiguration(); |
| regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); |
| regionServerCount = |
| conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); |
| rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); |
| numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); |
| numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); |
| sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); |
| enableBackup(conf); |
| LOG.info("Initializing cluster with {} region servers.", regionServerCount); |
| util.initializeCluster(regionServerCount); |
| LOG.info("Cluster initialized and ready"); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| LOG.info("Cleaning up after test."); |
| if(util.isDistributedCluster()) { |
| deleteTablesIfAny(); |
| LOG.info("Cleaning up after test. Deleted tables"); |
| cleanUpBackupDir(); |
| } |
| LOG.info("Restoring cluster."); |
| util.restoreCluster(); |
| LOG.info("Cluster restored."); |
| } |
| |
| @Override |
| public void setUpMonkey() throws Exception { |
| Policy p = new PeriodicRandomActionPolicy(sleepTime, |
| new RestartRandomRsExceptMetaAction(sleepTime)); |
| this.monkey = new PolicyBasedChaosMonkey(util, p); |
| startMonkey(); |
| } |
| |
| private void deleteTablesIfAny() throws IOException { |
| for (TableName table : tableNames) { |
| util.deleteTableIfAny(table); |
| } |
| } |
| |
| private void createTables() throws Exception { |
| tableNames = new TableName[numTables]; |
| for (int i = 0; i < numTables; i++) { |
| tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i); |
| } |
| for (TableName table : tableNames) { |
| createTable(table); |
| } |
| } |
| |
| private void enableBackup(Configuration conf) { |
| // Enable backup |
| conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); |
| BackupManager.decorateMasterConfiguration(conf); |
| BackupManager.decorateRegionServerConfiguration(conf); |
| } |
| |
| private void cleanUpBackupDir() throws IOException { |
| FileSystem fs = FileSystem.get(util.getConfiguration()); |
| fs.delete(new Path(BACKUP_ROOT_DIR), true); |
| } |
| |
| @Test |
| public void testBackupRestore() throws Exception { |
| BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR; |
| createTables(); |
| runTestMulti(); |
| } |
| |
| private void runTestMulti() throws IOException { |
| LOG.info("IT backup & restore started"); |
| Thread[] workers = new Thread[numTables]; |
| for (int i = 0; i < numTables; i++) { |
| final TableName table = tableNames[i]; |
| Runnable r = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| runTestSingle(table); |
| } catch (IOException e) { |
| LOG.error("Failed", e); |
| Assert.fail(e.getMessage()); |
| } |
| } |
| }; |
| workers[i] = new Thread(r); |
| workers[i].start(); |
| } |
| // Wait all workers to finish |
| for (Thread t : workers) { |
| Uninterruptibles.joinUninterruptibly(t); |
| } |
| LOG.info("IT backup & restore finished"); |
| } |
| |
| private void createTable(TableName tableName) throws Exception { |
| long startTime, endTime; |
| |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); |
| |
| TableDescriptor desc = builder.build(); |
| ColumnFamilyDescriptorBuilder cbuilder = |
| ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); |
| ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; |
| LOG.info("Creating table {} with {} splits.", tableName, |
| regionsCountPerServer * regionServerCount); |
| startTime = System.currentTimeMillis(); |
| HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, |
| regionsCountPerServer); |
| util.waitTableAvailable(tableName); |
| endTime = System.currentTimeMillis(); |
| LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); |
| } |
| |
| private void loadData(TableName table, int numRows) throws IOException { |
| Connection conn = util.getConnection(); |
| // #0- insert some data to a table |
| Table t1 = conn.getTable(table); |
| util.loadRandomRows(t1, new byte[]{'f'}, 100, numRows); |
| // flush table |
| conn.getAdmin().flush(TableName.valueOf(table.getName())); |
| } |
| |
| private String backup(BackupRequest request, BackupAdmin client) |
| throws IOException { |
| String backupId = client.backupTables(request); |
| return backupId; |
| } |
| |
| private void restore(RestoreRequest request, BackupAdmin client) |
| throws IOException { |
| client.restore(request); |
| } |
| |
| private void merge(String[] backupIds, BackupAdmin client) |
| throws IOException { |
| client.mergeBackups(backupIds); |
| } |
| |
| private void runTestSingle(TableName table) throws IOException { |
| |
| List<String> backupIds = new ArrayList<String>(); |
| List<Integer> tableSizes = new ArrayList<Integer>(); |
| |
| try (Connection conn = util.getConnection(); |
| Admin admin = conn.getAdmin(); |
| BackupAdmin client = new BackupAdminImpl(conn);) { |
| |
| // #0- insert some data to table 'table' |
| loadData(table, rowsInIteration); |
| tableSizes.add(rowsInIteration); |
| |
| // #1 - create full backup for table first |
| LOG.info("create full backup image for {}", table); |
| List<TableName> tables = Lists.newArrayList(table); |
| BackupRequest.Builder builder = new BackupRequest.Builder(); |
| BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) |
| .withTargetRootDir(BACKUP_ROOT_DIR).build(); |
| |
| String backupIdFull = backup(request, client); |
| assertTrue(checkSucceeded(backupIdFull)); |
| |
| backupIds.add(backupIdFull); |
| // Now continue with incremental backups |
| int count = 1; |
| while (count++ < numIterations) { |
| |
| // Load data |
| loadData(table, rowsInIteration); |
| tableSizes.add(rowsInIteration * count); |
| // Do incremental backup |
| builder = new BackupRequest.Builder(); |
| request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) |
| .withTargetRootDir(BACKUP_ROOT_DIR).build(); |
| String backupId = backup(request, client); |
| assertTrue(checkSucceeded(backupId)); |
| backupIds.add(backupId); |
| |
| // Restore incremental backup for table, with overwrite for previous backup |
| String previousBackupId = backupIds.get(backupIds.size() - 2); |
| restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1)); |
| // Restore incremental backup for table, with overwrite for last backup |
| restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count); |
| } |
| // Now merge all incremental and restore |
| String[] incBackupIds = allIncremental(backupIds); |
| merge(incBackupIds, client); |
| // Restore last one |
| String backupId = incBackupIds[incBackupIds.length - 1]; |
| // restore incremental backup for table, with overwrite |
| TableName[] tablesRestoreIncMultiple = new TableName[] { table }; |
| restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, |
| true), client); |
| Table hTable = conn.getTable(table); |
| Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations); |
| hTable.close(); |
| LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count-1)); |
| } |
| } |
| |
| private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table, |
| String backupId, long expectedRows) throws IOException { |
| |
| TableName[] tablesRestoreIncMultiple = new TableName[] { table }; |
| restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, |
| tablesRestoreIncMultiple, null, true), client); |
| Table hTable = conn.getTable(table); |
| Assert.assertEquals(expectedRows, util.countRows(hTable)); |
| hTable.close(); |
| } |
| |
| private String[] allIncremental(List<String> backupIds) { |
| int size = backupIds.size(); |
| backupIds = backupIds.subList(1, size); |
| String[] arr = new String[size - 1]; |
| backupIds.toArray(arr); |
| return arr; |
| } |
| |
| /** |
| * |
| * @param backupId pass backup ID to check status of |
| * @return status of backup |
| */ |
| protected boolean checkSucceeded(String backupId) throws IOException { |
| BackupInfo status = getBackupInfo(backupId); |
| if (status == null) { |
| return false; |
| } |
| return status.getState() == BackupState.COMPLETE; |
| } |
| |
| private BackupInfo getBackupInfo(String backupId) throws IOException { |
| try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) { |
| return table.readBackupInfo(backupId); |
| } |
| } |
| |
| /** |
| * Get restore request. |
| * |
| * @param backupRootDir directory where backup is located |
| * @param backupId backup ID |
| * @param check check the backup |
| * @param fromTables table names to restore from |
| * @param toTables new table names to restore to |
| * @param isOverwrite overwrite the table(s) |
| * @return an instance of RestoreRequest |
| */ |
| public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, |
| TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { |
| RestoreRequest.Builder builder = new RestoreRequest.Builder(); |
| return builder.withBackupRootDir(backupRootDir) |
| .withBackupId(backupId) |
| .withCheck(check) |
| .withFromTables(fromTables) |
| .withToTables(toTables) |
| .withOvewrite(isOverwrite).build(); |
| } |
| |
| @Override |
| public void setUpCluster() throws Exception { |
| util = getTestingUtil(getConf()); |
| enableBackup(getConf()); |
| LOG.debug("Initializing/checking cluster has {} servers",regionServerCount); |
| util.initializeCluster(regionServerCount); |
| LOG.debug("Done initializing/checking cluster"); |
| } |
| |
| /** |
| * |
| * @return status of CLI execution |
| */ |
| @Override |
| public int runTestFromCommandLine() throws Exception { |
| // Check if backup is enabled |
| if (!BackupManager.isBackupEnabled(getConf())) { |
| System.err.println(BackupRestoreConstants.ENABLE_BACKUP); |
| return -1; |
| } |
| System.out.println(BackupRestoreConstants.VERIFY_BACKUP); |
| testBackupRestore(); |
| return 0; |
| } |
| |
| @Override |
| public TableName getTablename() { |
| // That is only valid when Monkey is CALM (no monkey) |
| return null; |
| } |
| |
| @Override |
| protected Set<String> getColumnFamilies() { |
| // That is only valid when Monkey is CALM (no monkey) |
| return null; |
| } |
| |
| @Override |
| protected void addOptions() { |
| addOptWithArg(REGIONSERVER_COUNT_KEY, |
| "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); |
| addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); |
| addOptWithArg(ROWS_PER_ITERATION_KEY, |
| "Total number of data rows to be loaded during one iteration." + " Default: " |
| + DEFAULT_ROWS_IN_ITERATION); |
| addOptWithArg(NUM_ITERATIONS_KEY, |
| "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS); |
| addOptWithArg(NUMBER_OF_TABLES_KEY, |
| "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES); |
| addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms " + |
| "to restart random region server. Default: " + SLEEP_TIME_DEFAULT); |
| } |
| |
| @Override |
| protected void processOptions(CommandLine cmd) { |
| super.processOptions(cmd); |
| regionsCountPerServer = |
| Integer.parseInt(cmd.getOptionValue(REGION_COUNT_KEY, |
| Integer.toString(DEFAULT_REGION_COUNT))); |
| regionServerCount = |
| Integer.parseInt(cmd.getOptionValue(REGIONSERVER_COUNT_KEY, |
| Integer.toString(DEFAULT_REGIONSERVER_COUNT))); |
| rowsInIteration = |
| Integer.parseInt(cmd.getOptionValue(ROWS_PER_ITERATION_KEY, |
| Integer.toString(DEFAULT_ROWS_IN_ITERATION))); |
| numIterations = Integer.parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, |
| Integer.toString(DEFAULT_NUM_ITERATIONS))); |
| numTables = Integer.parseInt(cmd.getOptionValue(NUMBER_OF_TABLES_KEY, |
| Integer.toString(DEFAULT_NUMBER_OF_TABLES))); |
| sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, |
| Long.toString(SLEEP_TIME_DEFAULT))); |
| |
| |
| LOG.info(MoreObjects.toStringHelper("Parsed Options") |
| .add(REGION_COUNT_KEY, regionsCountPerServer) |
| .add(REGIONSERVER_COUNT_KEY, regionServerCount) |
| .add(ROWS_PER_ITERATION_KEY, rowsInIteration) |
| .add(NUM_ITERATIONS_KEY, numIterations) |
| .add(NUMBER_OF_TABLES_KEY, numTables) |
| .add(SLEEP_TIME_KEY, sleepTime) |
| .toString()); |
| } |
| |
| /** |
| * |
| * @param args argument list |
| */ |
| public static void main(String[] args) throws Exception { |
| Configuration conf = HBaseConfiguration.create(); |
| IntegrationTestingUtility.setUseDistributedCluster(conf); |
| int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); |
| System.exit(status); |
| } |
| } |