blob: 81f9a7ceb43fb84550f554e8d37d88cf9232d127 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
* pairs.
*/
@InterfaceAudience.Public
public class TableRecordReaderImpl {
public static final String LOG_PER_ROW_COUNT
= "hbase.mapreduce.log.scanner.rowcount";
private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
// HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
@InterfaceAudience.Private
static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
private ResultScanner scanner = null;
private Scan scan = null;
private Scan currentScan = null;
private Table htable = null;
private byte[] lastSuccessfulRow = null;
private ImmutableBytesWritable key = null;
private Result value = null;
private TaskAttemptContext context = null;
private long numRestarts = 0;
private long numStale = 0;
private long timestamp;
private int rowcount;
private boolean logScannerActivity = false;
private int logPerRowCount = 100;
/**
* Restart from survivable exceptions by creating a new scanner.
*
* @param firstRow The first row to start at.
* @throws IOException When restarting fails.
*/
public void restart(byte[] firstRow) throws IOException {
// Update counter metrics based on current scan before reinitializing it
if (currentScan != null) {
updateCounters();
}
currentScan = new Scan(scan);
currentScan.withStartRow(firstRow);
currentScan.setScanMetricsEnabled(true);
if (this.scanner != null) {
if (logScannerActivity) {
LOG.info("Closing the previously opened scanner object.");
}
this.scanner.close();
}
this.scanner = this.htable.getScanner(currentScan);
if (logScannerActivity) {
LOG.info("Current scan=" + currentScan.toString());
timestamp = System.currentTimeMillis();
rowcount = 0;
}
}
/**
* In new mapreduce APIs, TaskAttemptContext has two getCounter methods
* Check if getCounter(String, String) method is available.
* @return The getCounter method or null if not available.
* @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
*/
@Deprecated
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
throws IOException {
Method m = null;
try {
m = context.getClass().getMethod("getCounter",
new Class [] {String.class, String.class});
} catch (SecurityException e) {
throw new IOException("Failed test for getCounter", e);
} catch (NoSuchMethodException e) {
// Ignore
}
return m;
}
/**
* Sets the HBase table.
* @param htable The table to scan.
*/
public void setHTable(Table htable) {
Configuration conf = htable.getConfiguration();
logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false);
logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
this.htable = htable;
}
/**
* Sets the scan defining the actual details like columns etc.
*
* @param scan The scan to set.
*/
public void setScan(Scan scan) {
this.scan = scan;
}
/**
* Build the scanner. Not done in constructor to allow for extension.
*/
public void initialize(InputSplit inputsplit,
TaskAttemptContext context) throws IOException,
InterruptedException {
if (context != null) {
this.context = context;
}
restart(scan.getStartRow());
}
/**
* Closes the split.
*
*
*/
public void close() {
if (this.scanner != null) {
this.scanner.close();
}
try {
this.htable.close();
} catch (IOException ioe) {
LOG.warn("Error closing table", ioe);
}
}
/**
* Returns the current key.
*
* @return The current key.
* @throws InterruptedException When the job is aborted.
*/
public ImmutableBytesWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
/**
* Returns the current value.
*
* @return The current value.
* @throws IOException When the value is faulty.
* @throws InterruptedException When the job is aborted.
*/
public Result getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* Positions the record reader to the next record.
*
* @return <code>true</code> if there was another record.
* @throws IOException When reading the record failed.
* @throws InterruptedException When the job was aborted.
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new ImmutableBytesWritable();
}
if (value == null) {
value = new Result();
}
try {
try {
value = this.scanner.next();
if (value != null && value.isStale()) {
numStale++;
}
if (logScannerActivity) {
rowcount ++;
if (rowcount >= logPerRowCount) {
long now = System.currentTimeMillis();
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
timestamp = now;
rowcount = 0;
}
}
} catch (IOException e) {
// do not retry if the exception tells us not to do so
if (e instanceof DoNotRetryIOException) {
updateCounters();
throw e;
}
// try to handle all other IOExceptions by restarting
// the scanner, if the second call fails, it will be rethrown
LOG.info("recovered from " + StringUtils.stringifyException(e));
if (lastSuccessfulRow == null) {
LOG.warn("We are restarting the first next() invocation," +
" if your mapper has restarted a few other times like this" +
" then you should consider killing this job and investigate" +
" why it's taking so long.");
}
if (lastSuccessfulRow == null) {
restart(scan.getStartRow());
} else {
restart(lastSuccessfulRow);
scanner.next(); // skip presumed already mapped row
}
value = scanner.next();
if (value != null && value.isStale()) {
numStale++;
}
numRestarts++;
}
if (value != null && value.size() > 0) {
key.set(value.getRow());
lastSuccessfulRow = key.get();
return true;
}
// Need handle cursor result
if (value != null && value.isCursor()) {
key.set(value.getCursor().getRow());
lastSuccessfulRow = key.get();
return true;
}
updateCounters();
return false;
} catch (IOException ioe) {
updateCounters();
if (logScannerActivity) {
long now = System.currentTimeMillis();
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
LOG.info(ioe.toString(), ioe);
String lastRow = lastSuccessfulRow == null ?
"null" : Bytes.toStringBinary(lastSuccessfulRow);
LOG.info("lastSuccessfulRow=" + lastRow);
}
throw ioe;
}
}
/**
* If hbase runs on new version of mapreduce, RecordReader has access to
* counters thus can update counters based on scanMetrics.
* If hbase runs on old version of mapreduce, it won't be able to get
* access to counters and TableRecorderReader can't update counter values.
*/
private void updateCounters() {
ScanMetrics scanMetrics = scanner.getScanMetrics();
if (scanMetrics == null) {
return;
}
updateCounters(scanMetrics, numRestarts, context, numStale);
}
/**
* @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
* Use {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead.
*/
@Deprecated
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
updateCounters(scanMetrics, numScannerRestarts, context, numStale);
}
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (context == null) {
return;
}
for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
if (counter != null) {
counter.increment(entry.getValue());
}
}
if (numScannerRestarts != 0L) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
if (counter != null) {
counter.increment(numScannerRestarts);
}
}
if (numStale != 0L) {
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
if (counter != null) {
counter.increment(numStale);
}
}
}
/**
* The current progress of the record reader through its data.
*
* @return A number between 0.0 and 1.0, the fraction of the data read.
*/
public float getProgress() {
// Depends on the total number of tuples
return 0;
}
}