blob: 7ddde5b5ef217439977343e0140165fbec90409b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TableSnapshotInputFormatImpl {
// TODO: Snapshots files are owned in fs by the hbase user. There is no
// easy way to delegate access.
public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
// key for specifying the root dir of the restored snapshot
protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
/** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
private static final String LOCALITY_CUTOFF_MULTIPLIER =
"hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
/**
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
private HTableDescriptor htd;
private HRegionInfo regionInfo;
private String[] locations;
private String scan;
private String restoreDir;
// constructor for mapreduce framework / Writable
public InputSplit() {}
public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
Scan scan, Path restoreDir) {
this.htd = htd;
this.regionInfo = regionInfo;
if (locations == null || locations.isEmpty()) {
this.locations = new String[0];
} else {
this.locations = locations.toArray(new String[locations.size()]);
}
try {
this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
} catch (IOException e) {
LOG.warn("Failed to convert Scan to String", e);
}
this.restoreDir = restoreDir.toString();
}
public HTableDescriptor getHtd() {
return htd;
}
public String getScan() {
return scan;
}
public String getRestoreDir() {
return restoreDir;
}
public long getLength() {
//TODO: We can obtain the file sizes of the snapshot here.
return 0;
}
public String[] getLocations() {
return locations;
}
public HTableDescriptor getTableDescriptor() {
return htd;
}
public HRegionInfo getRegionInfo() {
return regionInfo;
}
// TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
// doing this wrapping with Writables.
@Override
public void write(DataOutput out) throws IOException {
TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
.setTable(ProtobufUtil.convertToTableSchema(htd))
.setRegion(HRegionInfo.convert(regionInfo));
for (String location : locations) {
builder.addLocations(location);
}
TableSnapshotRegionSplit split = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
split.writeTo(baos);
baos.close();
byte[] buf = baos.toByteArray();
out.writeInt(buf.length);
out.write(buf);
Bytes.writeByteArray(out, Bytes.toBytes(scan));
Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
}
@Override
public void readFields(DataInput in) throws IOException {
int len = in.readInt();
byte[] buf = new byte[len];
in.readFully(buf);
TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
this.htd = ProtobufUtil.convertToHTableDesc(split.getTable());
this.regionInfo = HRegionInfo.convert(split.getRegion());
List<String> locationsList = split.getLocationsList();
this.locations = locationsList.toArray(new String[locationsList.size()]);
this.scan = Bytes.toString(Bytes.readByteArray(in));
this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
}
}
/**
* Implementation class for RecordReader logic common between mapred and mapreduce.
*/
public static class RecordReader {
private InputSplit split;
private Scan scan;
private Result result = null;
private ImmutableBytesWritable row = null;
private ClientSideRegionScanner scanner;
public ClientSideRegionScanner getScanner() {
return scanner;
}
public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
HTableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
// region is immutable, this should be fine,
// otherwise we have to set the thread read point
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
// disable caching of data blocks
scan.setCacheBlocks(false);
scanner =
new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
}
public boolean nextKeyValue() throws IOException {
result = scanner.next();
if (result == null) {
//we are done
return false;
}
if (this.row == null) {
this.row = new ImmutableBytesWritable();
}
this.row.set(result.getRow());
return true;
}
public ImmutableBytesWritable getCurrentKey() {
return row;
}
public Result getCurrentValue() {
return result;
}
public long getPos() {
return 0;
}
public float getProgress() {
return 0; // TODO: use total bytes to estimate
}
public void close() {
if (this.scanner != null) {
this.scanner.close();
}
}
}
public static List<InputSplit> getSplits(Configuration conf) throws IOException {
String snapshotName = getSnapshotName(conf);
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
// TODO: mapred does not support scan as input API. Work around for now.
Scan scan = extractScanFromConf(conf);
// the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
return getSplits(scan, manifest, regionInfos, restoreDir, conf);
}
public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
if (regionManifests == null) {
throw new IllegalArgumentException("Snapshot seems empty");
}
List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
for (SnapshotRegionManifest regionManifest : regionManifests) {
regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
}
return regionInfos;
}
public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
Path rootDir, FileSystem fs) throws IOException {
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
}
public static Scan extractScanFromConf(Configuration conf) throws IOException {
Scan scan = null;
if (conf.get(TableInputFormat.SCAN) != null) {
scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
} else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
String[] columns =
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
scan = new Scan();
for (String col : columns) {
scan.addFamily(Bytes.toBytes(col));
}
} else {
throw new IllegalArgumentException("Unable to create scan");
}
return scan;
}
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
// load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor();
Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
List<InputSplit> splits = new ArrayList<InputSplit>();
for (HRegionInfo hri : regionManifests) {
// load region descriptor
if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())) {
// compute HDFS locations from snapshot files (which will get the locations for
// referred hfiles)
List<String> hosts = getBestLocations(conf,
HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
int len = Math.min(3, hosts.size());
hosts = hosts.subList(0, len);
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
return splits;
}
/**
* This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
* weights into account, thus will treat every location passed from the input split as equal. We
* do not want to blindly pass all the locations, since we are creating one split per region, and
* the region's blocks are all distributed throughout the cluster unless favorite node assignment
* is used. On the expected stable case, only one location will contain most of the blocks as
* local.
* On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
* we are doing a simple heuristic, where we will pass all hosts which have at least 80%
* (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
* host with the best locality.
*/
public static List<String> getBestLocations(
Configuration conf, HDFSBlocksDistribution blockDistribution) {
List<String> locations = new ArrayList<String>(3);
HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
if (hostAndWeights.length == 0) {
return locations;
}
HostAndWeight topHost = hostAndWeights[0];
locations.add(topHost.getHost());
// Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
double cutoffMultiplier
= conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
double filterWeight = topHost.getWeight() * cutoffMultiplier;
for (int i = 1; i < hostAndWeights.length; i++) {
if (hostAndWeights[i].getWeight() >= filterWeight) {
locations.add(hostAndWeights[i].getHost());
} else {
break;
}
}
return locations;
}
private static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) {
throw new IllegalArgumentException("Snapshot name must be provided");
}
return snapshotName;
}
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configuration
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @throws IOException if an error occurs
*/
public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
throws IOException {
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
// TODO: restore from record readers to parallelize.
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
}
}