blob: d9c9b871a596dd178605f360cef92a63139d356c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.yetus.audience.InterfaceAudience;
/**
* TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
* bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
* wals, etc) directly to provide maximum performance. The snapshot is not required to be
* restored to the live cluster or cloned. This also allows to run the mapreduce job from an
* online or offline hbase cluster. The snapshot files can be exported by using the
* {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
* and this InputFormat can be used to run the mapreduce job directly over the snapshot files.
* The snapshot should not be deleted while there are jobs reading from snapshot files.
* <p>
* Usage is similar to TableInputFormat, and
* {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
* can be used to configure the job.
* <pre>{@code
* Job job = new Job(conf);
* Scan scan = new Scan();
* TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
* scan, MyTableMapper.class, MyMapKeyOutput.class,
* MyMapOutputValueWritable.class, job, true);
* }
* </pre>
* <p>
* Internally, this input format restores the snapshot into the given tmp directory. By default,
* and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you
* can run N mapper tasks per every region, in which case the region key range will be split to
* N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading
* from each RecordReader. An internal RegionScanner is used to execute the
* {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
* <p>
* HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
* snapshot files and data files.
* To read from snapshot files directly from the file system, the user who is running the MR job
* must have sufficient permissions to access snapshot and reference files.
* This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
* user or the user must have group or other privileges in the filesystem (See HBASE-8369).
* Note that, given other users access to read from snapshot/data files will completely circumvent
* the access control enforced by HBase.
* @see org.apache.hadoop.hbase.client.TableSnapshotScanner
*/
@InterfaceAudience.Public
public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
private TableSnapshotInputFormatImpl.InputSplit delegate;
// constructor for mapreduce framework / Writable
public TableSnapshotRegionSplit() {
this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
}
public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
this.delegate = delegate;
}
public TableSnapshotRegionSplit(TableDescriptor htd, RegionInfo regionInfo,
List<String> locations, Scan scan, Path restoreDir) {
this.delegate =
new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
}
@Override
public long getLength() throws IOException, InterruptedException {
return delegate.getLength();
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return delegate.getLocations();
}
@Override
public void write(DataOutput out) throws IOException {
delegate.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
delegate.readFields(in);
}
public RegionInfo getRegion() {
return delegate.getRegionInfo();
}
TableSnapshotInputFormatImpl.InputSplit getDelegate() {
return this.delegate;
}
}
@InterfaceAudience.Private
static class TableSnapshotRegionRecordReader extends
RecordReader<ImmutableBytesWritable, Result> {
private TableSnapshotInputFormatImpl.RecordReader delegate =
new TableSnapshotInputFormatImpl.RecordReader();
private TaskAttemptContext context;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
this.context = context;
delegate.initialize(
((TableSnapshotRegionSplit) split).delegate,
context.getConfiguration());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean result = delegate.nextKeyValue();
if (result) {
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
if (scanMetrics != null && context != null) {
TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0);
}
}
return result;
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return delegate.getCurrentKey();
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return delegate.getProgress();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new TableSnapshotRegionRecordReader();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
List<InputSplit> results = new ArrayList<>();
for (TableSnapshotInputFormatImpl.InputSplit split :
TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
results.add(new TableSnapshotRegionSplit(split));
}
return results;
}
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param job the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @throws IOException if an error occurs
*/
public static void setInput(Job job, String snapshotName, Path restoreDir)
throws IOException {
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
}
/**
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param job the job to configure
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param splitAlgo split algorithm to generate splits from region
* @param numSplitsPerRegion how many input splits to generate per one region
* @throws IOException if an error occurs
*/
public static void setInput(Job job, String snapshotName, Path restoreDir,
RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
splitAlgo, numSplitsPerRegion);
}
}