blob: 091f9ea83b789827c646a2a88b5d76e7efade041 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hive.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
import org.apache.phoenix.hive.query.PhoenixQueryBuilder;
import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.compat.CompatUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Custom InputFormat to feed into Hive
*/
@SuppressWarnings({"deprecation", "rawtypes"})
public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<WritableComparable,
T> {
private static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class);
public PhoenixInputFormat() {
if (LOG.isDebugEnabled()) {
LOG.debug("PhoenixInputFormat created");
}
}
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
String tableName = jobConf.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
String query;
String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
if (LOG.isDebugEnabled()) {
LOG.debug("Target table name at split phase : " + tableName + "with whereCondition :" +
jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR) +
" and " + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname + " : " +
executionEngine);
}
List<IndexSearchCondition> conditionList = null;
String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized != null) {
ExprNodeGenericFuncDesc filterExpr =
SerializationUtilities.deserializeExpression(filterExprSerialized);
PhoenixPredicateDecomposer predicateDecomposer =
PhoenixPredicateDecomposer
.create(Arrays.asList(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")));
predicateDecomposer.decomposePredicate(filterExpr);
if (predicateDecomposer.isCalledPPD()) {
conditionList = predicateDecomposer.getSearchConditionList();
}
}
query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName,
PhoenixStorageHandlerUtil.getReadColumnNames(jobConf), conditionList);
final QueryPlan queryPlan = getQueryPlan(jobConf, query);
final List<KeyRange> allSplits = queryPlan.getSplits();
final List<InputSplit> splits = generateSplits(jobConf, queryPlan, allSplits, query);
return splits.toArray(new InputSplit[splits.size()]);
}
private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
final List<KeyRange> splits, String query) throws
IOException {
if (qplan == null){
throw new NullPointerException();
}if (splits == null){
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
.newJobContext(new Job(jobConf)));
boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
setScanCacheSize(jobConf);
// Adding Localization
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));
for (List<Scan> scans : qplan.getScans()) {
PhoenixInputSplit inputSplit;
HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
, false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
.getBatch() + "] and regionLocation : " + regionLocation);
}
inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));
LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " +
regionLocation);
for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
}
inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
}
}
return psplits;
}
private void setScanCacheSize(JobConf jobConf) {
int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1);
if (scanCacheSize > 0) {
jobConf.setInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, scanCacheSize);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize);
}
}
@Override
public RecordReader<WritableComparable, T> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws
IOException {
final QueryPlan queryPlan = getQueryPlan(job, ((PhoenixInputSplit) split).getQuery());
@SuppressWarnings("unchecked")
final Class<T> inputClass = (Class<T>) job.getClass(PhoenixConfigurationUtil.INPUT_CLASS,
PhoenixResultWritable.class);
PhoenixRecordReader<T> recordReader = new PhoenixRecordReader<T>(inputClass, job,
queryPlan);
recordReader.initialize(split);
return recordReader;
}
/**
* Returns the query plan associated with the select query.
*/
private QueryPlan getQueryPlan(final Configuration configuration, String selectStatement)
throws IOException {
try {
final String currentScnValue = configuration.get(PhoenixConfigurationUtil
.CURRENT_SCN_VALUE);
final Properties overridingProps = new Properties();
if (currentScnValue != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
}
final Connection connection = PhoenixConnectionUtil.getInputConnection(configuration,
overridingProps);
if (selectStatement == null) {
throw new NullPointerException();
}
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
if (LOG.isDebugEnabled()) {
LOG.debug("Compiled query : " + selectStatement);
}
// Optimize the query plan so that we potentially use secondary indexes
final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
// Initialize the query plan so it sets up the parallel scans
queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
return queryPlan;
} catch (Exception exception) {
LOG.error(String.format("Failed to get the query plan with error [%s]", exception.getMessage()));
throw new RuntimeException(exception);
}
}
}