blob: e64ba9a42680e3683951a2a0a9c691b2efe9d54f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.logical;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.drill.exec.store.hive.HiveUtilities.nativeReadersRuleMatches;
/**
* Convert Hive scan to use Drill's native parquet reader instead of Hive's native reader. It also adds a
* project to convert/cast the output of Drill's native parquet reader to match the expected output of Hive's
* native reader.
*/
public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptimizerRule {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ConvertHiveParquetScanToDrillParquetScan.class);
public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan();
private static final DrillSqlOperator INT96_TO_TIMESTAMP =
new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true, false);
private static final DrillSqlOperator RTRIM = new DrillSqlOperator("RTRIM", 1, true, false);
private ConvertHiveParquetScanToDrillParquetScan() {
super(RelOptHelper.any(DrillScanRel.class), "ConvertHiveScanToHiveDrillNativeScan:Parquet");
}
/**
* {@see org.apache.drill.exec.store.hive.HiveUtilities#nativeReadersRuleMatches}
*/
@Override
public boolean matches(RelOptRuleCall call) {
return nativeReadersRuleMatches(call, MapredParquetInputFormat.class);
}
@Override
public void onMatch(RelOptRuleCall call) {
try {
final DrillScanRel hiveScanRel = call.rel(0);
final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
final String partitionColumnLabel = settings.getFsPartitionColumnLabel();
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf());
final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);
if (logicalInputSplits.isEmpty()) {
// table is empty, use original scan
return;
}
final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel);
final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits, settings.getOptions());
if (hiveScanRel.getRowType().getFieldCount() == 0) {
call.transformTo(nativeScanRel);
} else {
final DrillProjectRel projectRel = createProjectRel(hiveScanRel, partitionColMapping, nativeScanRel);
call.transformTo(projectRel);
}
/*
Drill native scan should take precedence over Hive since it's more efficient and faster.
Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
To ensure Drill native scan will be chosen, reduce Hive scan importance to 0.
*/
call.getPlanner().prune(hiveScanRel);
} catch (final Exception e) {
logger.warn("Failed to convert HiveScan to HiveDrillNativeParquetScan", e);
}
}
/**
* Create mapping of Hive partition column to directory column mapping.
*/
private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
final Map<String, String> partitionColMapping = new HashMap<>();
int i = 0;
for (FieldSchema col : hiveTable.getPartitionKeys()) {
partitionColMapping.put(col.getName(), partitionColumnLabel+i);
i++;
}
return partitionColMapping;
}
/**
* Helper method which creates a DrillScalRel with native HiveScan.
*/
private DrillScanRel createNativeScanRel(Map<String, String> partitionColMapping,
DrillScanRel hiveScanRel,
List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits,
OptionManager options) throws IOException {
final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
final List<String> nativeScanColNames = new ArrayList<>();
final List<RelDataType> nativeScanColTypes = new ArrayList<>();
for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
final String dirColName = partitionColMapping.get(field.getName());
if (dirColName != null) { // partition column
nativeScanColNames.add(dirColName);
nativeScanColTypes.add(varCharType);
} else {
nativeScanColNames.add(field.getName());
nativeScanColTypes.add(field.getType());
}
}
final RelDataType nativeScanRowType = typeFactory.createStructType(nativeScanColTypes, nativeScanColNames);
// Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
// columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
// unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
final List<SchemaPath> nativeScanCols = new ArrayList<>();
for (SchemaPath colName : hiveScanRel.getColumns()) {
final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath());
if (partitionCol != null) {
nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
} else {
nativeScanCols.add(colName);
}
}
final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
final HiveDrillNativeParquetScan nativeHiveScan =
new HiveDrillNativeParquetScan(
hiveScan.getUserName(),
nativeScanCols,
hiveScan.getStoragePlugin(),
logicalInputSplits,
hiveScan.getConfProperties(),
ParquetReaderConfig.builder().withOptions(options).build());
return new DrillScanRel(
hiveScanRel.getCluster(),
hiveScanRel.getTraitSet(),
hiveScanRel.getTable(),
nativeHiveScan,
nativeScanRowType,
nativeScanCols);
}
/**
* Create a project that converts the native scan output to expected output of Hive scan.
*/
private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {
final List<RexNode> rexNodes = new ArrayList<>();
final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
final RelDataType hiveScanRowType = hiveScanRel.getRowType();
for (String colName : hiveScanRowType.getFieldNames()) {
final String dirColName = partitionColMapping.get(colName);
if (dirColName != null) {
rexNodes.add(createPartitionColumnCast(hiveScanRel, nativeScanRel, colName, dirColName, rb));
} else {
rexNodes.add(createColumnFormatConversion(hiveScanRel, nativeScanRel, colName, rb));
}
}
return DrillProjectRel.create(
hiveScanRel.getCluster(), hiveScanRel.getTraitSet(), nativeScanRel, rexNodes,
hiveScanRowType /* project rowtype and HiveScanRel rowtype should be the same */);
}
/**
* Apply any data format conversion expressions.
*/
private RexNode createColumnFormatConversion(DrillScanRel hiveScanRel, DrillScanRel nativeScanRel,
String colName, RexBuilder rb) {
RelDataType outputType = hiveScanRel.getRowType().getField(colName, false, false).getType();
RelDataTypeField inputField = nativeScanRel.getRowType().getField(colName, false, false);
RexInputRef inputRef = rb.makeInputRef(inputField.getType(), inputField.getIndex());
PlannerSettings settings = PrelUtil.getPlannerSettings(hiveScanRel.getCluster().getPlanner());
boolean conversionToTimestampEnabled = settings.getOptions().getBoolean(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP && !conversionToTimestampEnabled) {
// TIMESTAMP is stored as INT96 by Hive in ParquetFormat.
// Used convert_fromTIMESTAMP_IMPALA UDF to convert INT96 format data to TIMESTAMP
// only for the case when `store.parquet.reader.int96_as_timestamp` is
// disabled to avoid double conversion after reading value from parquet and here.
return rb.makeCall(INT96_TO_TIMESTAMP, inputRef);
}
return inputRef;
}
/**
* Create a cast for partition column. Partition column is output as "VARCHAR" in native parquet reader. Cast it
* appropriate type according the partition type in HiveScan.
*/
private RexNode createPartitionColumnCast(final DrillScanRel hiveScanRel, final DrillScanRel nativeScanRel,
final String outputColName, final String dirColName, final RexBuilder rb) {
final RelDataType outputType = hiveScanRel.getRowType().getField(outputColName, false, false).getType();
final RelDataTypeField inputField = nativeScanRel.getRowType().getField(dirColName, false, false);
final RexInputRef inputRef =
rb.makeInputRef(rb.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), inputField.getIndex());
if (outputType.getSqlTypeName() == SqlTypeName.CHAR) {
return rb.makeCall(RTRIM, inputRef);
}
return rb.makeCast(outputType, inputRef);
}
}