blob: 307568c976d40bc6956a5a7e61dcc7b0afbc800f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
/**
* A non-instantiable class that has a static method capable of compacting
* a table by merging adjacent regions.
*/
@InterfaceAudience.Private
class HMerge {
// TODO: Where is this class used? How does it relate to Merge in same package?
private static final Log LOG = LogFactory.getLog(HMerge.class);
static final Random rand = new Random();
/*
* Not instantiable
*/
private HMerge() {
super();
}
/**
* Scans the table and merges two adjacent regions if they are small. This
* only happens when a lot of rows are deleted.
*
* When merging the hbase:meta region, the HBase instance must be offline.
* When merging a normal table, the HBase instance must be online, but the
* table must be disabled.
*
* @param conf - configuration object for HBase
* @param fs - FileSystem where regions reside
* @param tableName - Table to be compacted
* @throws IOException
*/
public static void merge(Configuration conf, FileSystem fs,
final TableName tableName)
throws IOException {
merge(conf, fs, tableName, true);
}
/**
* Scans the table and merges two adjacent regions if they are small. This
* only happens when a lot of rows are deleted.
*
* When merging the hbase:meta region, the HBase instance must be offline.
* When merging a normal table, the HBase instance must be online, but the
* table must be disabled.
*
* @param conf - configuration object for HBase
* @param fs - FileSystem where regions reside
* @param tableName - Table to be compacted
* @param testMasterRunning True if we are to verify master is down before
* running merge
* @throws IOException
*/
public static void merge(Configuration conf, FileSystem fs,
final TableName tableName, final boolean testMasterRunning)
throws IOException {
boolean masterIsRunning = false;
ClusterConnection hConnection = null;
if (testMasterRunning) {
try {
hConnection = (ClusterConnection) ConnectionFactory.createConnection(conf);
masterIsRunning = hConnection.isMasterRunning();
} finally {
if (hConnection != null) {
hConnection.close();
}
}
}
if (tableName.equals(TableName.META_TABLE_NAME)) {
if (masterIsRunning) {
throw new IllegalStateException(
"Can not compact hbase:meta table if instance is on-line");
}
// TODO reenable new OfflineMerger(conf, fs).process();
} else {
if(!masterIsRunning) {
throw new IllegalStateException(
"HBase instance must be running to merge a normal table");
}
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
if (!admin.isTableDisabled(tableName)) {
throw new TableNotDisabledException(tableName);
}
}
new OnlineMerger(conf, fs, tableName).process();
}
}
private static abstract class Merger {
protected final Configuration conf;
protected final FileSystem fs;
protected final Path rootDir;
protected final HTableDescriptor htd;
protected final WALFactory walFactory;
private final long maxFilesize;
protected Merger(Configuration conf, FileSystem fs, final TableName tableName)
throws IOException {
this.conf = conf;
this.fs = fs;
this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
this.rootDir = FSUtils.getRootDir(conf);
Path tabledir = FSUtils.getTableDir(this.rootDir, tableName);
this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, tabledir);
String logname = "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME;
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tabledir);
this.walFactory = new WALFactory(walConf, null, logname);
}
void process() throws IOException {
try {
for (HRegionInfo[] regionsToMerge = next();
regionsToMerge != null;
regionsToMerge = next()) {
if (!merge(regionsToMerge)) {
return;
}
}
} finally {
try {
walFactory.close();
} catch(IOException e) {
LOG.error(e);
}
}
}
protected boolean merge(final HRegionInfo[] info) throws IOException {
if (info.length < 2) {
LOG.info("only one region - nothing to merge");
return false;
}
HRegion currentRegion = null;
long currentSize = 0;
HRegion nextRegion = null;
long nextSize = 0;
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i], this.htd,
walFactory.getWAL(info[i].getEncodedNameAsBytes(),
info[i].getTable().getNamespace()));
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i + 1], this.htd,
walFactory.getWAL(info[i + 1].getEncodedNameAsBytes(),
info[i + 1].getTable().getNamespace()));
nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
// We merge two adjacent regions if their total size is less than
// one half of the desired maximum size
LOG.info("Merging regions " + currentRegion.getRegionInfo().getRegionNameAsString() +
" and " + nextRegion.getRegionInfo().getRegionNameAsString());
HRegion mergedRegion =
HRegion.mergeAdjacent(currentRegion, nextRegion);
updateMeta(currentRegion.getRegionInfo().getRegionName(),
nextRegion.getRegionInfo().getRegionName(), mergedRegion);
break;
}
LOG.info("not merging regions " +
Bytes.toStringBinary(currentRegion.getRegionInfo().getRegionName()) +
" and " + Bytes.toStringBinary(nextRegion.getRegionInfo().getRegionName()));
currentRegion.close();
currentRegion = nextRegion;
currentSize = nextSize;
}
if(currentRegion != null) {
currentRegion.close();
}
return true;
}
protected abstract HRegionInfo[] next() throws IOException;
protected abstract void updateMeta(final byte [] oldRegion1,
final byte [] oldRegion2, HRegion newRegion)
throws IOException;
}
/** Instantiated to compact a normal user table */
private static class OnlineMerger extends Merger {
private final TableName tableName;
private final Table table;
private final ResultScanner metaScanner;
private HRegionInfo latestRegion;
OnlineMerger(Configuration conf, FileSystem fs,
final TableName tableName)
throws IOException {
super(conf, fs, tableName);
this.tableName = tableName;
Connection connection = ConnectionFactory.createConnection(conf);
this.table = connection.getTable(TableName.META_TABLE_NAME);
this.metaScanner = table.getScanner(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
this.latestRegion = null;
}
private HRegionInfo nextRegion() throws IOException {
try {
Result results = getMetaRow();
if (results == null) {
return null;
}
HRegionInfo region = MetaTableAccessor.getHRegionInfo(results);
if (region == null) {
throw new NoSuchElementException("meta region entry missing " +
Bytes.toString(HConstants.CATALOG_FAMILY) + ":" +
Bytes.toString(HConstants.REGIONINFO_QUALIFIER));
}
if (!region.getTable().equals(this.tableName)) {
return null;
}
return region;
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
LOG.error("meta scanner error", e);
metaScanner.close();
throw e;
}
}
/*
* Check current row has a HRegionInfo. Skip to next row if HRI is empty.
* @return A Map of the row content else null if we are off the end.
* @throws IOException
*/
private Result getMetaRow() throws IOException {
Result currentRow = metaScanner.next();
boolean foundResult = false;
while (currentRow != null) {
LOG.info("Row: <" + Bytes.toStringBinary(currentRow.getRow()) + ">");
byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
if (regionInfoValue == null || regionInfoValue.length == 0) {
currentRow = metaScanner.next();
continue;
}
HRegionInfo region = MetaTableAccessor.getHRegionInfo(currentRow);
if (!region.getTable().equals(this.tableName)) {
currentRow = metaScanner.next();
continue;
}
foundResult = true;
break;
}
return foundResult ? currentRow : null;
}
@Override
protected HRegionInfo[] next() throws IOException {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
if(latestRegion == null) {
latestRegion = nextRegion();
}
if(latestRegion != null) {
regions.add(latestRegion);
}
latestRegion = nextRegion();
if(latestRegion != null) {
regions.add(latestRegion);
}
return regions.toArray(new HRegionInfo[regions.size()]);
}
@Override
protected void updateMeta(final byte [] oldRegion1,
final byte [] oldRegion2,
HRegion newRegion)
throws IOException {
byte[][] regionsToDelete = {oldRegion1, oldRegion2};
for (int r = 0; r < regionsToDelete.length; r++) {
if(Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
latestRegion = null;
}
Delete delete = new Delete(regionsToDelete[r]);
table.delete(delete);
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r]));
}
}
newRegion.getRegionInfo().setOffline(true);
MetaTableAccessor.addRegionToMeta(table, newRegion.getRegionInfo());
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: "
+ Bytes.toStringBinary(newRegion.getRegionInfo().getRegionName()));
}
}
}
}