blob: 4a2bb5857c5de90368ac0d86bc18b6dd5712ef5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.hive;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Ordering;
import org.apache.drill.shaded.guava.com.google.common.collect.TreeMultimap;
import org.apache.drill.shaded.guava.com.google.common.io.ByteArrayDataOutput;
import org.apache.drill.shaded.guava.com.google.common.io.ByteStreams;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Class which provides methods to get metadata of given Hive table selection. It tries to use the stats stored in
* MetaStore whenever available and delays the costly operation of loading of InputSplits until needed. When
* loaded, InputSplits are cached to speedup subsequent access.
*/
public class HiveMetadataProvider {
private static final Logger logger = LoggerFactory.getLogger(HiveMetadataProvider.class);
public static final int RECORD_SIZE = 1024;
private final HiveReadEntry hiveReadEntry;
private final UserGroupInformation ugi;
private final boolean isPartitionedTable;
private final Map<Partition, List<LogicalInputSplit>> partitionInputSplitMap;
private final HiveConf hiveConf;
private List<LogicalInputSplit> tableInputSplits;
private final Stopwatch watch = Stopwatch.createUnstarted();
public HiveMetadataProvider(final String userName, final HiveReadEntry hiveReadEntry, final HiveConf hiveConf) {
this.hiveReadEntry = hiveReadEntry;
this.ugi = ImpersonationUtil.createProxyUgi(userName);
this.isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
this.partitionInputSplitMap = new HashMap<>();
this.hiveConf = hiveConf;
}
/**
* Return stats for table/partitions in given {@link HiveReadEntry}.
* If valid stats are available in MetaStore, return it.
* Otherwise estimate using the size of the input data.
*
* @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this cache object.
* @return hive statistics holder
* @throws IOException if was unable to retrieve table statistics
*/
public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
Stopwatch timeGetStats = Stopwatch.createStarted();
HiveTableWithColumnCache table = hiveReadEntry.getTable();
try {
if (!isPartitionedTable) {
Properties properties = MetaStoreUtils.getTableMetadata(table);
HiveStats stats = HiveStats.getStatsFromProps(properties);
if (stats.valid()) {
return stats;
}
return stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
estimateStatsFromInputSplits(getTableInputSplits());
} else {
HiveStats aggStats = new HiveStats(0, 0);
for (HivePartition partition : hiveReadEntry.getPartitions()) {
Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
HiveStats stats = HiveStats.getStatsFromProps(properties);
if (!stats.valid()) {
stats = stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
estimateStatsFromInputSplits(getPartitionInputSplits(partition));
}
aggStats.add(stats);
}
return aggStats;
}
} catch (Exception e) {
throw new IOException("Failed to get number of rows and total size from HiveTable", e);
} finally {
logger.debug("Took {} µs to get stats from {}.{}", timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
table.getDbName(), table.getTableName());
}
}
/** Helper method which return InputSplits for non-partitioned table */
private List<LogicalInputSplit> getTableInputSplits() {
Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
if (tableInputSplits != null) {
return tableInputSplits;
}
final Properties properties = HiveUtilities.getTableMetadata(hiveReadEntry.getTable());
tableInputSplits = splitInputWithUGI(properties, hiveReadEntry.getTable().getSd(), null);
return tableInputSplits;
}
/** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
* metadata cache requests for the same partition(s).
*/
private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) {
if (partitionInputSplitMap.containsKey(partition)) {
return partitionInputSplitMap.get(partition);
}
final Properties properties = HiveUtilities.getPartitionMetadata(partition, hiveReadEntry.getTable());
final List<LogicalInputSplit> splits = splitInputWithUGI(properties, partition.getSd(), partition);
partitionInputSplitMap.put(partition, splits);
return splits;
}
/**
* Return {@link LogicalInputSplit}s for given {@link HiveReadEntry}. First splits are looked up in cache, if not
* found go through {@link InputFormat#getSplits(JobConf, int)} to find the splits.
*
* @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
* @return list of logically grouped input splits
*/
public List<LogicalInputSplit> getInputSplits(HiveReadEntry hiveReadEntry) {
Stopwatch timeGetSplits = Stopwatch.createStarted();
try {
if (!isPartitionedTable) {
return getTableInputSplits();
}
return hiveReadEntry.getPartitions().stream()
.flatMap(p -> getPartitionInputSplits(p).stream())
.collect(Collectors.toList());
} catch (final Exception e) {
logger.error("Failed to get InputSplits", e);
throw new DrillRuntimeException("Failed to get InputSplits", e);
} finally {
logger.debug("Took {} µs to get InputSplits from {}.{}", timeGetSplits.elapsed(TimeUnit.NANOSECONDS) / 1000,
hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
}
}
/**
* Get the list of directories which contain the input files. This list is useful for explain plan purposes.
*
* @param hiveReadEntry {@link HiveReadEntry} containing the input table and/or partitions.
*/
protected List<String> getInputDirectories(HiveReadEntry hiveReadEntry) {
if (isPartitionedTable) {
return hiveReadEntry.getPartitions().stream()
.map(p -> p.getSd().getLocation())
.collect(Collectors.toList());
}
return Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
}
/**
* Estimate the stats from the given list of logically grouped input splits.
*
* @param inputSplits list of logically grouped input splits
* @return hive stats with numRows and totalSizeInBytes
*/
private HiveStats estimateStatsFromInputSplits(List<LogicalInputSplit> inputSplits) throws IOException {
logger.trace("Collecting stats based on input splits size. " +
"It means that we might have fetched all input splits before applying any possible optimizations (ex: partition pruning). " +
"Consider using ANALYZE command on Hive table to collect statistics before running queries.");
long sizeInBytes = 0;
for (LogicalInputSplit split : inputSplits) {
sizeInBytes += split.getLength();
}
return estimateStatsFromBytes(sizeInBytes);
}
/**
* Estimates Hive stats based on give size in bytes.
*
* @param sizeInBytes size in bytes
* @return hive stats with numRows and totalSizeInBytes
*/
private HiveStats estimateStatsFromBytes(long sizeInBytes) {
long numRows = sizeInBytes / RECORD_SIZE;
// if the result of division is zero and data size > 0, estimate to one row
numRows = numRows == 0 && sizeInBytes > 0 ? 1 : numRows;
return new HiveStats(numRows, sizeInBytes);
}
/**
* Gets list of input splits based on table location.
* These input splits are grouped logically by file name
* if skip header / footer logic should be applied later on.
*
* @param properties table or partition properties
* @param sd storage descriptor
* @param partition hive partition
* @return list of logically grouped input splits
*/
private List<LogicalInputSplit> splitInputWithUGI(final Properties properties, final StorageDescriptor sd, final Partition partition) {
watch.start();
try {
return ugi.doAs((PrivilegedExceptionAction<List<LogicalInputSplit>>) () -> {
final List<LogicalInputSplit> splits = new ArrayList<>();
final JobConf job = new JobConf(hiveConf);
HiveUtilities.addConfToJob(job, properties);
HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
final Path path = new Path(sd.getLocation());
final FileSystem fs = path.getFileSystem(job);
if (fs.exists(path)) {
FileInputFormat.addInputPath(job, path);
final InputFormat<?, ?> format = job.getInputFormat();
InputSplit[] inputSplits = format.getSplits(job, 1);
// if current table with text input format and has header / footer,
// we need to make sure that splits of the same file are grouped together
if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
splits.add(new LogicalInputSplit(logicalInputSplit, partition));
}
} else {
for (final InputSplit split : inputSplits) {
splits.add(new LogicalInputSplit(split, partition));
}
}
}
return splits;
});
} catch (final InterruptedException | IOException e) {
final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
logger.error(errMsg, e);
throw new DrillRuntimeException(errMsg, e);
} finally {
logger.trace("Took {} µs to get splits from {}", watch.elapsed(TimeUnit.NANOSECONDS) / 1000, sd.getLocation());
watch.stop();
}
}
/**
* <p>
* Groups input splits by file path. Each inout split group is ordered by starting bytes
* to ensure file parts in correct order.
* </p>
* <p>
* Example:
* <pre>
* hdfs:///tmp/table/file_1.txt -> hdfs:///tmp/table/file_1.txt:0+10000
* hdfs:///tmp/table/file_1.txt:10001+20000
* hdfs:///tmp/table/file_2.txt -> hdfs:///tmp/table/file_2.txt:0+10000
* </pre>
* </p>
* @param inputSplits input splits
* @return multimap where key is file path and value is group of ordered file splits
*/
private Multimap<Path, FileSplit> transformFileSplits(InputSplit[] inputSplits) {
Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(
Ordering.natural(), Comparator.comparingLong(FileSplit::getStart));
for (InputSplit inputSplit : inputSplits) {
FileSplit fileSplit = (FileSplit) inputSplit;
inputSplitGroups.put(fileSplit.getPath(), fileSplit);
}
return inputSplitGroups;
}
/**
* Contains group of input splits along with the partition. For non-partitioned tables, the partition field is null.
* Input splits can be logically grouped together, for example, in case of when table has header / footer.
* In this case all splits from the same file should be processed together.
*/
public static class LogicalInputSplit {
private final Collection<InputSplit> inputSplits = new ArrayList<>();
private final Partition partition;
public LogicalInputSplit(InputSplit inputSplit, Partition partition) {
inputSplits.add(inputSplit);
this.partition = partition;
}
public LogicalInputSplit(Collection<? extends InputSplit> inputSplits, Partition partition) {
this.inputSplits.addAll(inputSplits);
this.partition = partition;
}
public Collection<InputSplit> getInputSplits() {
return inputSplits;
}
public Partition getPartition() {
return partition;
}
/**
* @return returns total length of all stored input splits
*/
public long getLength() throws IOException {
long length = 0L;
for (InputSplit inputSplit: inputSplits) {
length += inputSplit.getLength();
}
return length;
}
/**
* @return collection of unique locations where input splits are stored
*/
public Collection<String> getLocations() throws IOException {
Set<String> locations = new HashSet<>();
for (InputSplit inputSplit: inputSplits) {
Collections.addAll(locations, inputSplit.getLocations());
}
return locations;
}
/**
* Serializes each input split to string using Base64 encoding.
*
* @return list of serialized input splits
*/
public List<String> serialize() throws IOException {
List<String> serializedInputSplits = new ArrayList<>();
for (InputSplit inputSplit : inputSplits) {
final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
inputSplit.write(byteArrayOutputStream);
final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
logger.debug("Encoded split string for split {} : {}", inputSplit, encoded);
serializedInputSplits.add(encoded);
}
return serializedInputSplits;
}
/**
* @return returns input split class name if at least one input split is present, null otherwise.
*/
public String getType() {
if (inputSplits.isEmpty()) {
return null;
}
return inputSplits.iterator().next().getClass().getName();
}
}
/**
* Contains stats. Currently only numRows and totalSizeInBytes are used.
*/
public static class HiveStats {
private static final Logger logger = LoggerFactory.getLogger(HiveStats.class);
private long numRows;
private long sizeInBytes;
public HiveStats(long numRows, long sizeInBytes) {
this.numRows = numRows;
this.sizeInBytes = sizeInBytes;
}
/**
* Get the stats from table properties. If not found -1 is returned for each stats field.
* CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
* Hive table to have up-to-date stats.
*
* @param properties the source of table stats
* @return {@link HiveStats} instance with rows number and size in bytes from specified properties
*/
public static HiveStats getStatsFromProps(Properties properties) {
long numRows = -1;
long sizeInBytes = -1;
try {
String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
if (sizeInBytesProp != null) {
sizeInBytes = Long.valueOf(sizeInBytesProp);
}
String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
if (numRowsProp != null) {
numRows = Long.valueOf(numRowsProp);
}
} catch (NumberFormatException e) {
logger.error("Failed to parse Hive stats from metastore.", e);
// continue with the defaults.
}
HiveStats hiveStats = new HiveStats(numRows, sizeInBytes);
logger.trace("Obtained Hive stats from properties: {}.", hiveStats);
return hiveStats;
}
public long getNumRows() {
return numRows;
}
public long getSizeInBytes() {
return sizeInBytes;
}
/**
* Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid
*/
public boolean valid() {
return numRows > 0 && sizeInBytes > 0;
}
public void add(HiveStats s) {
numRows += s.numRows;
sizeInBytes += s.sizeInBytes;
}
@Override
public String toString() {
return "numRows: " + numRows + ", sizeInBytes: " + sizeInBytes;
}
}
}