blob: 5ddc63d9ca46d30348fcdd3758d71f62142d9524 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.THdfsTableSink;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.thrift.TTableSink;
import org.apache.impala.thrift.TTableSinkType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
* Sink for inserting into filesystem-backed tables.
*
* TODO(vercegovac): rename to FsTableSink
*/
public class HdfsTableSink extends TableSink {
// Default number of partitions used for computeResourceProfile() in the absence of
// column stats.
protected final long DEFAULT_NUM_PARTITIONS = 10;
// Exprs for computing the output partition(s).
protected final List<Expr> partitionKeyExprs_;
// Whether to overwrite the existing partition(s).
protected final boolean overwrite_;
// Indicates whether the input is ordered by the partition keys, meaning partitions can
// be opened, written, and closed one by one.
protected final boolean inputIsClustered_;
private static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
HdfsFileFormat.PARQUET, HdfsFileFormat.TEXT, HdfsFileFormat.LZO_TEXT,
HdfsFileFormat.RC_FILE, HdfsFileFormat.SEQUENCE_FILE, HdfsFileFormat.AVRO);
// Stores the indices into the list of non-clustering columns of the target table that
// are stored in the 'sort.columns' table property. This is sent to the backend to
// populate the RowGroup::sorting_columns list in parquet files.
// If sortingOrder_ is not lexicographical, the backend will skip this process.
private List<Integer> sortColumns_ = new ArrayList<>();
private TSortingOrder sortingOrder_;
// Stores the allocated write id if the target table is transactional, otherwise -1.
private long writeId_;
public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
List<Expr> outputExprs,
boolean overwrite, boolean inputIsClustered,
Pair<List<Integer>, TSortingOrder> sortProperties, long writeId) {
super(targetTable, Op.INSERT, outputExprs);
Preconditions.checkState(targetTable instanceof FeFsTable);
partitionKeyExprs_ = partitionKeyExprs;
overwrite_ = overwrite;
inputIsClustered_ = inputIsClustered;
sortColumns_ = sortProperties.first;
sortingOrder_ = sortProperties.second;
writeId_ = writeId;
}
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
PlanNode inputNode = fragment_.getPlanRoot();
int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
// Compute the number of partitions buffered in memory at the same time, taking into
// account the number of nodes and the data partition of the fragment executing this
// sink.
long numBufferedPartitionsPerInstance;
if (inputIsClustered_) {
// If the insert is clustered, it produces a single partition at a time.
numBufferedPartitionsPerInstance = 1;
} else {
numBufferedPartitionsPerInstance =
fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), partitionKeyExprs_);
if (numBufferedPartitionsPerInstance == -1) {
numBufferedPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
}
}
FeFsTable table = (FeFsTable) targetTable_;
// TODO: Estimate the memory requirements more accurately by partition type.
Set<HdfsFileFormat> formats = table.getFileFormats();
long perPartitionMemReq = getPerPartitionMemReq(formats);
long perInstanceMemEstimate;
// The estimate is based purely on the per-partition mem req if the input cardinality_
// or the avg row size is unknown.
if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
perInstanceMemEstimate = numBufferedPartitionsPerInstance * perPartitionMemReq;
} else {
// The per-partition estimate may be higher than the memory required to buffer
// the entire input data.
long perInstanceInputCardinality =
Math.max(1L, inputNode.getCardinality() / numInstances);
long perInstanceInputBytes =
(long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize());
long perInstanceMemReq =
PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, perPartitionMemReq);
perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq);
}
resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
}
/**
* Returns the per-partition memory requirement for inserting into the given
* set of file formats.
*/
private long getPerPartitionMemReq(Set<HdfsFileFormat> formats) {
Set<HdfsFileFormat> unsupportedFormats =
Sets.difference(formats, SUPPORTED_FILE_FORMATS);
if (!unsupportedFormats.isEmpty()) {
Preconditions.checkState(false,
"Unsupported TableSink format(s): " + Joiner.on(',').join(unsupportedFormats));
}
if (formats.contains(HdfsFileFormat.PARQUET)) {
// Writing to a Parquet partition requires up to 1GB of buffer. From a resource
// management purview, even if there are non-Parquet partitions, we want to be
// conservative and make a high memory estimate.
return 1024L * 1024L * 1024L;
}
// For all other supported formats (TEXT, LZO_TEXT, RC_FILE, SEQUENCE_FILE & AVRO)
// 100KB is a very approximate estimate of the amount of data buffered.
return 100L * 1024L;
}
@Override
public void appendSinkExplainString(String prefix, String detailPrefix,
TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
String overwriteStr = ", OVERWRITE=" + (overwrite_ ? "true" : "false");
String partitionKeyStr = "";
if (!partitionKeyExprs_.isEmpty()) {
StringBuilder tmpBuilder = new StringBuilder(", PARTITION-KEYS=(");
for (Expr expr: partitionKeyExprs_) {
tmpBuilder.append(expr.toSql() + ",");
}
tmpBuilder.deleteCharAt(tmpBuilder.length() - 1);
tmpBuilder.append(")");
partitionKeyStr = tmpBuilder.toString();
}
output.append(String.format("%sWRITE TO HDFS [%s%s%s]\n", prefix,
targetTable_.getFullName(), overwriteStr, partitionKeyStr));
// Report the total number of partitions, independent of the number of nodes
// and the data partition of the fragment executing this sink.
if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_);
if (totalNumPartitions == -1) {
output.append(detailPrefix + "partitions=unavailable");
} else {
output.append(detailPrefix + "partitions="
+ (totalNumPartitions == 0 ? 1 : totalNumPartitions));
}
output.append("\n");
}
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(detailPrefix + "output exprs: ")
.append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
}
}
@Override
protected String getLabel() {
return "HDFS WRITER";
}
@Override
protected void toThriftImpl(TDataSink tsink) {
THdfsTableSink hdfsTableSink = new THdfsTableSink(
Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_,
sortingOrder_);
FeFsTable table = (FeFsTable) targetTable_;
StringBuilder error = new StringBuilder();
int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);
// Errors will be caught during analysis.
Preconditions.checkState(error.length() == 0);
if (skipHeaderLineCount > 0) {
hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
}
hdfsTableSink.setSort_columns(sortColumns_);
hdfsTableSink.setSorting_order(sortingOrder_);
if (writeId_ != -1) hdfsTableSink.setWrite_id(writeId_);
TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
TTableSinkType.HDFS, sinkOp_.toThrift());
tTableSink.hdfs_table_sink = hdfsTableSink;
tsink.table_sink = tTableSink;
tsink.output_exprs = Expr.treesToThrift(outputExprs_);
}
@Override
protected TDataSinkType getSinkType() {
return TDataSinkType.TABLE_SINK;
}
@Override
public void collectExprs(List<Expr> exprs) {
exprs.addAll(partitionKeyExprs_);
// Avoid adding any partition exprs redundantly.
exprs.addAll(outputExprs_.subList(0, targetTable_.getNonClusteringColumns().size()));
}
}