blob: 66c91995c751adb164cac9bd04ddda9ad3572b1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hive.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hive.PhoenixRowKey;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
/**
* RecordReader implementation that iterates over the the records.
*/
@SuppressWarnings("rawtypes")
public class PhoenixRecordReader<T extends DBWritable> implements
RecordReader<WritableComparable, T> {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixRecordReader.class);
private final Configuration configuration;
private final QueryPlan queryPlan;
private WritableComparable key;
private T value = null;
private Class<T> inputClass;
private ResultIterator resultIterator = null;
private PhoenixResultSet resultSet;
private long readCount;
private boolean isTransactional;
public PhoenixRecordReader(Class<T> inputClass, final Configuration configuration, final
QueryPlan queryPlan) throws IOException {
this.inputClass = inputClass;
this.configuration = configuration;
this.queryPlan = queryPlan;
isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(configuration);
}
public void initialize(InputSplit split) throws IOException {
final PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
final List<Scan> scans = pSplit.getScans();
if (LOG.isInfoEnabled()) {
LOG.info("Target table : " + queryPlan.getTableRef().getTable().getPhysicalName());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans.get(0)
.getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans.size() - 1)
.getStopRow()));
LOG.debug("First scan : " + scans.get(0) + " scanAttribute : " + scans.get(0)
.getAttributesMap());
for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " +
Bytes.toStringBinary(scans.get(i).getAttribute(BaseScannerRegionObserver
.EXPECTED_UPPER_REGION_KEY)));
}
}
try {
List<PeekingResultIterator> iterators = new ArrayList<>(scans.size
());
StatementContext ctx = queryPlan.getContext();
ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
long renewScannerLeaseThreshold = queryPlan.getContext().getConnection()
.getQueryServices().getRenewLeaseThresholdMilliSeconds();
for (Scan scan : scans) {
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
.toBytes(true));
ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, ctx.getConnection().getLogLevel());
final TableResultIterator tableResultIterator = new TableResultIterator(
queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder,
renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
(tableResultIterator);
iterators.add(peekingResultIterator);
}
ResultIterator iterator = queryPlan.useRoundRobinIterator()
? RoundRobinResultIterator.newIterator(iterators, queryPlan)
: ConcatResultIterator.newIterator(iterators);
if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
iterator = new SequenceResultIterator(iterator, queryPlan.getContext()
.getSequenceManager());
}
this.resultIterator = iterator;
// Clone the row projector as it's not thread safe and would be used
// simultaneously by multiple threads otherwise.
this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector()
.cloneIfNecessary(),
queryPlan.getContext());
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ", e
.getMessage()));
Throwables.propagate(e);
}
}
@Override
public boolean next(WritableComparable key, T value) throws IOException {
try {
if (!resultSet.next()) {
return false;
}
value.readFields(resultSet);
if (isTransactional) {
((PhoenixResultWritable) value).readPrimaryKey((PhoenixRowKey) key);
}
++readCount;
if (LOG.isTraceEnabled()) {
LOG.trace("Result[" + readCount + "] : " + ((PhoenixResultWritable) value)
.getResultMap());
}
return true;
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",
e.getMessage()));
throw new RuntimeException(e);
}
}
@Override
public WritableComparable createKey() {
if (isTransactional) {
key = new PhoenixRowKey();
} else {
key = NullWritable.get();
}
return key;
}
@Override
public T createValue() {
value = ReflectionUtils.newInstance(inputClass, this.configuration);
return value;
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("Read Count : " + readCount);
}
if (resultIterator != null) {
try {
resultIterator.close();
} catch (SQLException e) {
LOG.error(" Error closing resultset.");
throw new RuntimeException(e);
}
}
}
@Override
public float getProgress() throws IOException {
return 0;
}
}