blob: f15168e266b1038d851501bd26c7f916ee82c3f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
@Category({RegionServerTests.class, LargeTests.class})
public class TestRegionMergeTransactionOnCluster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class);
@Rule public TestName name = new TestName();
private static final int NB_SERVERS = 3;
private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int INITIAL_REGION_NUM = 10;
private static final int ROWSIZE = 200;
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static int waitTime = 60 * 1000;
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HMaster MASTER;
private static Admin ADMIN;
@BeforeClass
public static void beforeAllTests() throws Exception {
// Start a cluster
StartMiniClusterOption option = StartMiniClusterOption.builder()
.masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
TEST_UTIL.startMiniCluster(option);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
MASTER = cluster.getMaster();
MASTER.balanceSwitch(false);
ADMIN = TEST_UTIL.getConnection().getAdmin();
}
@AfterClass
public static void afterAllTests() throws Exception {
TEST_UTIL.shutdownMiniCluster();
if (ADMIN != null) {
ADMIN.close();
}
}
@Test
public void testWholesomeMerge() throws Exception {
LOG.info("Starting " + name.getMethodName());
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
// Merge 2nd and 3th region
PairOfSameType<RegionInfo> mergedRegions =
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2);
verifyRowCount(table, ROWSIZE);
// Randomly choose one of the two merged regions
RegionInfo hri = RandomUtils.nextBoolean() ? mergedRegions.getFirst() : mergedRegions.getSecond();
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
AssignmentManager am = cluster.getMaster().getAssignmentManager();
RegionStates regionStates = am.getRegionStates();
// We should not be able to assign it again
am.assign(hri);
assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri));
// We should not be able to unassign it either
am.unassign(hri);
assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri));
table.close();
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
/**
* Not really restarting the master. Simulate it by clear of new region
* state since it is not persisted, will be lost after master restarts.
*/
@Test
public void testMergeAndRestartingMaster() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName);
try {
MyMasterRpcServices.enabled.set(true);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
} finally {
MyMasterRpcServices.enabled.set(false);
}
table.close();
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testCleanMergeReference() throws Exception {
LOG.info("Starting " + name.getMethodName());
ADMIN.catalogJanitorSwitch(false);
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
verifyRowCount(table, ROWSIZE);
table.close();
List<Pair<RegionInfo, ServerName>> tableRegions = MetaTableAccessor
.getTableRegionsAndLocations(MASTER.getConnection(), tableName);
RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(
tableName);
Result mergedRegionResult = MetaTableAccessor.getRegionResult(
MASTER.getConnection(), mergedRegionInfo.getRegionName());
// contains merge reference in META
assertTrue(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells()));
// merging regions' directory are in the file system all the same
List<RegionInfo> p = CatalogFamilyFormat.getMergeRegions(mergedRegionResult.rawCells());
RegionInfo regionA = p.get(0);
RegionInfo regionB = p.get(1);
FileSystem fs = MASTER.getMasterFileSystem().getFileSystem();
Path rootDir = MASTER.getMasterFileSystem().getRootDir();
Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable());
Path regionAdir = new Path(tabledir, regionA.getEncodedName());
Path regionBdir = new Path(tabledir, regionB.getEncodedName());
assertTrue(fs.exists(regionAdir));
assertTrue(fs.exists(regionBdir));
ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
HRegionFileSystem hrfs = new HRegionFileSystem(
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
int count = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
count += hrfs.getStoreFiles(colFamily.getName()).size();
}
ADMIN.compactRegion(mergedRegionInfo.getRegionName());
// clean up the merged region store files
// wait until merged region have reference file
long timeout = System.currentTimeMillis() + waitTime;
int newcount = 0;
while (System.currentTimeMillis() < timeout) {
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount > count) {
break;
}
Thread.sleep(50);
}
assertTrue(newcount > count);
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
.getRegionServerThreads();
for (RegionServerThread rs : regionServerThreads) {
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
rs.getRegionServer(), false);
cleaner.chore();
Thread.sleep(1000);
}
while (System.currentTimeMillis() < timeout) {
int newcount1 = 0;
for(ColumnFamilyDescriptor colFamily : columnFamilies) {
newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount1 <= 1) {
break;
}
Thread.sleep(50);
}
// run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions
int cleaned = 0;
while (cleaned == 0) {
cleaned = ADMIN.runCatalogJanitor();
LOG.debug("catalog janitor returned " + cleaned);
Thread.sleep(50);
// Cleanup is async so wait till all procedures are done running.
ProcedureTestingUtility.waitNoProcedureRunning(
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor());
}
// We used to check for existence of region in fs but sometimes the region dir was
// cleaned up by the time we got here making the test sometimes flakey.
assertTrue(cleaned > 0);
// Wait around a bit to give stuff a chance to complete.
while (true) {
mergedRegionResult = MetaTableAccessor
.getRegionResult(TEST_UTIL.getConnection(), mergedRegionInfo.getRegionName());
if (CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells())) {
LOG.info("Waiting on cleanup of merge columns {}",
Arrays.asList(mergedRegionResult.rawCells()).stream().
map(c -> c.toString()).collect(Collectors.joining(",")));
Threads.sleep(50);
} else {
break;
}
}
assertFalse(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells()));
} finally {
ADMIN.catalogJanitorSwitch(true);
TEST_UTIL.deleteTable(tableName);
}
}
/**
* This test tests 1, merging region not online;
* 2, merging same two regions; 3, merging unknown regions.
* They are in one test case so that we don't have to create
* many tables, and these tests are simple.
*/
@Test
public void testMerge() throws Exception {
LOG.info("Starting " + name.getMethodName());
final TableName tableName = TableName.valueOf(name.getMethodName());
final Admin admin = TEST_UTIL.getAdmin();
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName);
AssignmentManager am = MASTER.getAssignmentManager();
List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName);
// Fake offline one region
RegionInfo a = regions.get(0);
RegionInfo b = regions.get(1);
am.unassign(b);
am.offlineRegion(b);
try {
// Merge offline region. Region a is offline here
FutureUtils.get(
admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false));
fail("Offline regions should not be able to merge");
} catch (DoNotRetryRegionException ie) {
System.out.println(ie);
assertTrue(ie instanceof MergeRegionException);
}
try {
// Merge the same region: b and b.
FutureUtils
.get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true));
fail("A region should not be able to merge with itself, even forcfully");
} catch (IOException ie) {
assertTrue("Exception should mention regions not online",
StringUtils.stringifyException(ie).contains("region to itself") &&
ie instanceof MergeRegionException);
}
try {
// Merge unknown regions
FutureUtils.get(admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true));
fail("Unknown region could not be merged");
} catch (IOException ie) {
assertTrue("UnknownRegionException should be thrown", ie instanceof UnknownRegionException);
}
table.close();
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testMergeWithReplicas() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName, 5, 2);
List<Pair<RegionInfo, ServerName>> initialRegionToServers =
MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
// Merge 1st and 2nd region
PairOfSameType<RegionInfo> mergedRegions =
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2);
List<Pair<RegionInfo, ServerName>> currentRegionToServers =
MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
List<RegionInfo> initialRegions = new ArrayList<>();
for (Pair<RegionInfo, ServerName> p : initialRegionToServers) {
initialRegions.add(p.getFirst());
}
List<RegionInfo> currentRegions = new ArrayList<>();
for (Pair<RegionInfo, ServerName> p : currentRegionToServers) {
currentRegions.add(p.getFirst());
}
assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region
assertTrue(initialRegions.contains(RegionReplicaUtil
.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); //this is the replica of the first region
assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region
assertTrue(initialRegions.contains(RegionReplicaUtil
.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); //this is the replica of the second region
assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region
assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); //replica of the new region
assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); //replica of the new region
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); //replica of the merged region
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); //replica of the merged region
table.close();
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(
HMaster master, TableName tablename,
int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
PairOfSameType<RegionInfo> mergedRegions =
requestMergeRegion(master, tablename, regionAnum, regionBnum);
waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
return mergedRegions;
}
private PairOfSameType<RegionInfo> requestMergeRegion(
HMaster master, TableName tablename,
int regionAnum, int regionBnum) throws Exception {
List<Pair<RegionInfo, ServerName>> tableRegions = MetaTableAccessor
.getTableRegionsAndLocations(
TEST_UTIL.getConnection(), tablename);
RegionInfo regionA = tableRegions.get(regionAnum).getFirst();
RegionInfo regionB = tableRegions.get(regionBnum).getFirst();
ADMIN.mergeRegionsAsync(
regionA.getEncodedNameAsBytes(),
regionB.getEncodedNameAsBytes(), false);
return new PairOfSameType<>(regionA, regionB);
}
private void waitAndVerifyRegionNum(HMaster master, TableName tablename,
int expectedRegionNum) throws Exception {
List<Pair<RegionInfo, ServerName>> tableRegionsInMeta;
List<RegionInfo> tableRegionsInMaster;
long timeout = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < timeout) {
tableRegionsInMeta =
MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
tableRegionsInMaster =
master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename);
LOG.info(Objects.toString(tableRegionsInMaster));
LOG.info(Objects.toString(tableRegionsInMeta));
int tableRegionsInMetaSize = tableRegionsInMeta.size();
int tableRegionsInMasterSize = tableRegionsInMaster.size();
if (tableRegionsInMetaSize == expectedRegionNum
&& tableRegionsInMasterSize == expectedRegionNum) {
break;
}
Thread.sleep(250);
}
tableRegionsInMeta = MetaTableAccessor.getTableRegionsAndLocations(
TEST_UTIL.getConnection(), tablename);
LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
assertEquals(expectedRegionNum, tableRegionsInMeta.size());
}
private Table createTableAndLoadData(HMaster master, TableName tablename)
throws Exception {
return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
}
private Table createTableAndLoadData(HMaster master, TableName tablename,
int numRegions, int replication) throws Exception {
assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
byte[][] splitRows = new byte[numRegions - 1][];
for (int i = 0; i < splitRows.length; i++) {
splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
}
Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
LOG.info("Created " + table.getName());
if (replication > 1) {
HBaseTestingUtility.setReplicas(ADMIN, tablename, replication);
LOG.info("Set replication of " + replication + " on " + table.getName());
}
loadData(table);
LOG.info("Loaded " + table.getName());
verifyRowCount(table, ROWSIZE);
LOG.info("Verified " + table.getName());
List<Pair<RegionInfo, ServerName>> tableRegions;
TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
LOG.info("All regions assigned for table - " + table.getName());
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
TEST_UTIL.getConnection(), tablename);
assertEquals("Wrong number of regions in table " + tablename,
numRegions * replication, tableRegions.size());
LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
assertEquals(numRegions * replication, tableRegions.size());
return table;
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
}
return ret;
}
private void loadData(Table table) throws IOException {
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
}
private void verifyRowCount(Table table, int expectedRegionNum)
throws IOException {
ResultScanner scanner = table.getScanner(new Scan());
int rowCount = 0;
while (scanner.next() != null) {
rowCount++;
}
assertEquals(expectedRegionNum, rowCount);
scanner.close();
}
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyMasterRpcServices(this);
}
}
static class MyMasterRpcServices extends MasterRpcServices {
static AtomicBoolean enabled = new AtomicBoolean(false);
private HMaster myMaster;
public MyMasterRpcServices(HMaster master) throws IOException {
super(master);
myMaster = master;
}
@Override
public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
ReportRegionStateTransitionRequest req) throws ServiceException {
ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
if (enabled.get() && req.getTransition(0).getTransitionCode()
== TransitionCode.READY_TO_MERGE && !resp.hasErrorMessage()) {
RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
for (RegionState regionState: regionStates.getRegionsStateInTransition()) {
// Find the merging_new region and remove it
if (regionState.isMergingNew()) {
regionStates.deleteRegion(regionState.getRegion());
}
}
}
return resp;
}
}
}