blob: a9344e5437cabefd113786de8f018ac3b568616d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import static org.apache.impala.analysis.ToSqlOptions.DEFAULT;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TKuduPartitionByHashParam;
import org.apache.impala.thrift.TKuduPartitionByRangeParam;
import org.apache.impala.thrift.TKuduPartitionParam;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Represents the partitioning of a Kudu table as defined in the PARTITION BY
* clause of a CREATE TABLE statement. The partitioning can be hash-based or
* range-based or both. See RangePartition for details on the supported range partitions.
*
* Examples:
* - Hash-based:
* PARTITION BY HASH(id) PARTITIONS 10
* - Single column range-based:
* PARTITION BY RANGE(age)
* (
* PARTITION VALUES < 10,
* PARTITION 10 <= VALUES < 20,
* PARTITION 20 <= VALUES < 30,
* PARTITION VALUE = 100
* )
* - Combination of hash and range based:
* PARTITION BY HASH (id) PARTITIONS 3,
* RANGE (age)
* (
* PARTITION 10 <= VALUES < 20,
* PARTITION VALUE = 100
* )
* - Multi-column range based:
* PARTITION BY RANGE (year, quarter)
* (
* PARTITION VALUE = (2001, 1),
* PARTITION VALUE = (2001, 2),
* PARTITION VALUE = (2002, 1)
* )
*
*/
public class KuduPartitionParam extends StmtNode {
/**
* Creates a hash-based KuduPartitionParam.
*/
public static KuduPartitionParam createHashParam(List<String> cols, int numPartitions) {
return new KuduPartitionParam(Type.HASH, cols, numPartitions, null);
}
/**
* Creates a range-based KuduPartitionParam.
*/
public static KuduPartitionParam createRangeParam(List<String> cols,
List<RangePartition> rangePartitions) {
return new KuduPartitionParam(Type.RANGE, cols, NO_HASH_PARTITIONS, rangePartitions);
}
private static final int NO_HASH_PARTITIONS = -1;
/**
* The partitioning type.
*/
public enum Type {
HASH, RANGE
}
// Columns of this partitioning. If no columns are specified, all
// the primary key columns of the associated table are used.
private final List<String> colNames_ = new ArrayList<>();
// Map of primary key column names to the associated column definitions. Must be set
// before the call to analyze().
private Map<String, ColumnDef> pkColumnDefByName_;
// partitioning scheme type
private final Type type_;
// Only relevant for hash-based partitioning, -1 otherwise
private final int numHashPartitions_;
// List of range partitions specified in a range-based partitioning.
private List<RangePartition> rangePartitions_;
private KuduPartitionParam(Type t, List<String> colNames, int numHashPartitions,
List<RangePartition> partitions) {
type_ = t;
for (String name: colNames) colNames_.add(name.toLowerCase());
rangePartitions_ = partitions;
numHashPartitions_ = numHashPartitions;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
Preconditions.checkState(!colNames_.isEmpty());
Preconditions.checkNotNull(pkColumnDefByName_);
Preconditions.checkState(!pkColumnDefByName_.isEmpty());
// Validate that the columns specified in this partitioning are primary key columns.
for (String colName: colNames_) {
if (!pkColumnDefByName_.containsKey(colName)) {
throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
"column. Only key columns can be used in PARTITION BY.", colName, toSql()));
}
}
if (type_ == Type.RANGE) analyzeRangeParam(analyzer);
}
/**
* Analyzes a range-based partitioning. This function does not check for overlapping
* range partitions; these checks are performed by Kudu and an error is reported back
* to the user.
*/
public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException {
List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size());
for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName));
for (RangePartition rangePartition: rangePartitions_) {
rangePartition.analyze(analyzer, pkColDefs);
}
}
@Override
public final String toSql() {
return toSql(DEFAULT);
}
@Override
public String toSql(ToSqlOptions options) {
StringBuilder builder = new StringBuilder(type_.toString());
if (!colNames_.isEmpty()) {
builder.append(" (");
Joiner.on(", ").appendTo(builder, colNames_).append(")");
}
if (type_ == Type.HASH) {
Preconditions.checkState(numHashPartitions_ != NO_HASH_PARTITIONS);
builder.append(" PARTITIONS ").append(numHashPartitions_);
} else {
builder.append(" (");
if (rangePartitions_ != null) {
List<String> partsSql = new ArrayList<>();
for (RangePartition rangePartition: rangePartitions_) {
partsSql.add(rangePartition.toSql(options));
}
builder.append(Joiner.on(", ").join(partsSql));
} else {
builder.append("...");
}
builder.append(")");
}
return builder.toString();
}
@Override
public String toString() {
return toSql(DEFAULT);
}
public TKuduPartitionParam toThrift() {
TKuduPartitionParam result = new TKuduPartitionParam();
// TODO: Add a validate() function to ensure the validity of distribute params.
if (type_ == Type.HASH) {
TKuduPartitionByHashParam hash = new TKuduPartitionByHashParam();
Preconditions.checkState(numHashPartitions_ != NO_HASH_PARTITIONS);
hash.setNum_partitions(numHashPartitions_);
hash.setColumns(colNames_);
result.setBy_hash_param(hash);
} else {
Preconditions.checkState(type_ == Type.RANGE);
TKuduPartitionByRangeParam rangeParam = new TKuduPartitionByRangeParam();
rangeParam.setColumns(colNames_);
if (rangePartitions_ == null) {
result.setBy_range_param(rangeParam);
return result;
}
for (RangePartition rangePartition: rangePartitions_) {
rangeParam.addToRange_partitions(rangePartition.toThrift());
}
result.setBy_range_param(rangeParam);
}
return result;
}
void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
pkColumnDefByName_ = pkColumnDefByName;
}
boolean hasColumnNames() { return !colNames_.isEmpty(); }
public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); }
void setColumnNames(Collection<String> colNames) {
Preconditions.checkState(colNames_.isEmpty());
colNames_.addAll(colNames);
}
public Type getType() { return type_; }
}