blob: df72bac0b995423c160ef6eb53f1fb7fb3465c23 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TDataPartition;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.util.List;
/**
* Specification of the partition of a single stream of data.
* Examples of those streams of data are: the scan of a table; the output
* of a plan fragment; etc. (ie, this is not restricted to direct exchanges
* between two fragments, which in the backend is facilitated by the classes
* DataStreamSender/DataStreamMgr/DataStreamRecvr).
* TODO: better name? just Partitioning?
*/
public class DataPartition {
private static final Logger LOG = LogManager.getLogger(DataPartition.class);
public static final DataPartition UNPARTITIONED = new DataPartition(TPartitionType.UNPARTITIONED);
public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM);
private final TPartitionType type;
// for hash partition: exprs used to compute hash value
private ImmutableList<Expr> partitionExprs = ImmutableList.of();
private List<DataSplitSink.EtlRangePartitionInfo> partitions;
public DataPartition(TPartitionType type, List<Expr> exprs) {
Preconditions.checkNotNull(exprs);
Preconditions.checkState(!exprs.isEmpty());
Preconditions.checkState(
type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.copyOf(exprs);
}
public DataPartition(
List<Expr> partitionExprs,
List<DataSplitSink.EtlRangePartitionInfo> partitions) {
this.type = TPartitionType.RANGE_PARTITIONED;
this.partitionExprs = ImmutableList.copyOf(partitionExprs);
this.partitions = partitions;
}
public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException {
List<Expr> list = Expr.trySubstituteList(partitionExprs, smap, analyzer, false);
partitionExprs = ImmutableList.copyOf(list);
}
public DataPartition(TPartitionType type) {
Preconditions.checkState(
type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
public static DataPartition hashPartitioned(List<Expr> exprs) {
return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
}
public boolean isPartitioned() {
return type != TPartitionType.UNPARTITIONED;
}
public TPartitionType getType() {
return type;
}
public List<Expr> getPartitionExprs() {
return partitionExprs;
}
public TDataPartition toThrift() {
TDataPartition result = new TDataPartition(type);
if (partitionExprs != null) {
result.setPartition_exprs(Expr.treesToThrift(partitionExprs));
}
if (partitions != null) {
result.setPartition_infos(DataSplitSink.EtlRangePartitionInfo.listToThrift(partitions));
}
return result;
}
/**
* Returns true if 'this' is a partition that is compatible with the
* requirements of 's'.
* TODO: specify more clearly and implement
*/
public boolean isCompatible(DataPartition s) {
// TODO: implement
return true;
}
public String getExplainString(TExplainLevel explainLevel) {
StringBuilder str = new StringBuilder();
str.append(type.toString());
if (!partitionExprs.isEmpty()) {
List<String> strings = Lists.newArrayList();
for (Expr expr : partitionExprs) {
strings.add(expr.toSql());
}
str.append(": " + Joiner.on(", ").join(strings));
}
str.append("\n");
return str.toString();
}
}