blob: abfc6c995b9ec899c0170a8ac9979ab8d004bb37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hbase;
import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
import static org.apache.hadoop.hbase.HConstants.REGIONINFO_QUALIFIER;
import static org.apache.hadoop.hbase.HConstants.STATE_QUALIFIER;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HBase maintenance tool for merging regions of a specific table, until a target number of regions
* for the table is reached, or no more merges can complete due to limit in resulting merged
* region size.
*/
public class RegionsMerger extends Configured implements org.apache.hadoop.util.Tool {
private static final Logger LOG = LoggerFactory.getLogger(RegionsMerger.class.getName());
public static final String RESULTING_REGION_UPPER_MARK = "hbase.tools.merge.upper.mark";
public static final String SLEEP = "hbase.tools.merge.sleep";
public static final String MAX_ROUNDS_IDLE = "hbase.tools.max.iterations.blocked";
private final Configuration conf;
private final FileSystem fs;
private final double resultSizeThreshold;
private final int sleepBetweenCycles;
private final long maxRoundsStuck;
public RegionsMerger(Configuration conf) throws IOException {
this.conf = conf;
Path basePath = new Path(conf.get(HConstants.HBASE_DIR));
fs = basePath.getFileSystem(conf);
resultSizeThreshold = this.conf.getDouble(RESULTING_REGION_UPPER_MARK, 0.9) *
this.conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
sleepBetweenCycles = this.conf.getInt(SLEEP, 2000);
this.maxRoundsStuck = this.conf.getInt(MAX_ROUNDS_IDLE, 10);
}
private Path getTablePath(TableName table){
Path basePath = new Path(conf.get(HConstants.HBASE_DIR));
basePath = new Path(basePath, "data");
Path tablePath = new Path(basePath, table.getNamespaceAsString());
return new Path(tablePath, table.getQualifierAsString());
}
private long sumSizeInFS(Path parentPath) throws IOException {
long size = 0;
FileStatus[] files = this.fs.listStatus(parentPath);
for(FileStatus f : files) {
if(f.isFile()) {
size += f.getLen();
} else if(f.isDirectory()) {
size += sumSizeInFS(f.getPath());
}
}
return size;
}
private List<RegionInfo> getOpenRegions(Connection connection, TableName table) throws Exception {
List<RegionInfo> regions = new ArrayList<>();
Table metaTbl = connection.getTable(META_TABLE_NAME);
String tblName = table.getNameAsString();
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL,
new SubstringComparator(tblName+","));
SingleColumnValueFilter colFilter = new SingleColumnValueFilter(CATALOG_FAMILY,
STATE_QUALIFIER, CompareOperator.EQUAL, Bytes.toBytes("OPEN"));
colFilter.setFilterIfMissing(true);
Scan scan = new Scan();
FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filter.addFilter(rowFilter);
filter.addFilter(colFilter);
scan.setFilter(filter);
try(ResultScanner rs = metaTbl.getScanner(scan)){
Result r;
while ((r = rs.next()) != null) {
RegionInfo region = RegionInfo.parseFrom(r.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER));
regions.add(region);
}
}
return regions;
}
private boolean canMerge(Path path, RegionInfo region1, RegionInfo region2,
Collection<Pair<RegionInfo, RegionInfo>> alreadyMerging) throws IOException {
if(alreadyMerging.stream().anyMatch(regionPair ->
region1.equals(regionPair.getFirst()) ||
region2.equals(regionPair.getFirst()) ||
region1.equals(regionPair.getSecond()) ||
region2.equals(regionPair.getSecond()))){
return false;
}
if (RegionInfo.areAdjacent(region1, region2)) {
long size1 = sumSizeInFS(new Path(path, region1.getEncodedName()));
long size2 = sumSizeInFS(new Path(path, region2.getEncodedName()));
boolean mergeable = (resultSizeThreshold > (size1 + size2));
if (!mergeable) {
LOG.warn("Not merging regions {} and {} because resulting region size would get close to " +
"the {} limit. {} total size: {}; {} total size:{}", region1.getEncodedName(),
region2.getEncodedName(), resultSizeThreshold, region1.getEncodedName(), size1,
region2.getEncodedName(), size2);
}
return mergeable;
} else {
LOG.warn(
"WARNING: Can't merge regions {} and {} because those are not adjacent.",
region1.getEncodedName(),
region2.getEncodedName());
return false;
}
}
public void mergeRegions(String tblName, int targetRegions) throws Exception {
TableName table = TableName.valueOf(tblName);
Path tableDir = getTablePath(table);
try(Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
LongAdder counter = new LongAdder();
LongAdder lastTimeProgessed = new LongAdder();
//need to get all regions for the table, regardless of region state
List<RegionInfo> regions = admin.getRegions(table);
Map<Future, Pair<RegionInfo, RegionInfo>> regionsMerging = new ConcurrentHashMap<>();
long roundsNoProgress = 0;
while (regions.size() > targetRegions) {
LOG.info("Iteration: {}", counter);
RegionInfo previous = null;
int regionSize = regions.size();
LOG.info("Attempting to merge {} regions to reach the target {} ...", regionSize, targetRegions);
//to request merge, regions must be OPEN, though
regions = getOpenRegions(conn, table);
for (RegionInfo current : regions) {
if (!current.isSplit()) {
if (previous != null && canMerge(tableDir, previous, current, regionsMerging.values())) {
Future f = admin.mergeRegionsAsync(current.getEncodedNameAsBytes(),
previous.getEncodedNameAsBytes(), true);
Pair<RegionInfo, RegionInfo> regionPair = new Pair<>(previous, current);
regionsMerging.put(f,regionPair);
previous = null;
if ((regionSize - regionsMerging.size()) <= targetRegions) {
break;
}
} else {
previous = current;
}
}
else{
LOG.debug("Skipping split region: {}", current.getEncodedName());
}
}
counter.increment();
LOG.info("Sleeping for {} seconds before next iteration...", (sleepBetweenCycles/1000));
Thread.sleep(sleepBetweenCycles);
regionsMerging.forEach((f, currentPair)-> {
if (f.isDone()) {
LOG.info("Merged regions {} and {} together.",
currentPair.getFirst().getEncodedName(),
currentPair.getSecond().getEncodedName());
regionsMerging.remove(f);
lastTimeProgessed.reset();
lastTimeProgessed.add(counter.longValue());
} else {
LOG.warn("Merge of regions {} and {} isn't completed yet.",
currentPair.getFirst(),
currentPair.getSecond());
}
});
roundsNoProgress = counter.longValue() - lastTimeProgessed.longValue();
if(roundsNoProgress == this.maxRoundsStuck){
LOG.warn("Reached {} iterations without progressing with new merges. Aborting...",
roundsNoProgress);
break;
}
//again, get all regions, regardless of the state,
// in order to avoid breaking the loop prematurely
regions = admin.getRegions(table);
}
}
}
@Override
public int run(String[] args) {
if(args.length!=2){
LOG.error("Wrong number of arguments. "
+ "Arguments are: <TABLE_NAME> <TARGET_NUMBER_OF_REGIONS>");
return 1;
}
try {
this.mergeRegions(args[0], Integer.parseInt(args[1]));
} catch(Exception e){
LOG.error("Merging regions failed:", e);
return 2;
}
return 0;
}
public static void main(String [] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int errCode = ToolRunner.run(new RegionsMerger(conf), args);
if (errCode != 0) {
System.exit(errCode);
}
}
}