| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.plan.util |
| |
| import org.apache.flink.api.common.operators.Order |
| import org.apache.flink.table.api.TableException |
| import org.apache.flink.table.calcite.FlinkPlannerImpl |
| import org.apache.flink.table.codegen.SortCodeGenerator |
| import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel |
| import org.apache.flink.table.runtime.sort.BinaryIndexedSortable |
| import org.apache.flink.table.typeutils.BinaryRowSerializer |
| |
| import org.apache.calcite.rel.RelFieldCollation.Direction |
| import org.apache.calcite.rel.`type`._ |
| import org.apache.calcite.rel.metadata.RelMetadataQuery |
| import org.apache.calcite.rel.{RelCollation, RelFieldCollation, RelNode, RelWriter} |
| import org.apache.calcite.rex.{RexLiteral, RexNode} |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| |
| /** |
| * Common methods for Flink sort operators. |
| */ |
| object SortUtil { |
| |
| def isDeterministic(offset: RexNode, fetch: RexNode): Boolean = { |
| FlinkRexUtil.isDeterministicOperator(offset) && FlinkRexUtil.isDeterministicOperator(fetch) |
| } |
| |
| /** |
| * Returns the direction of the first sort field. |
| * |
| * @param collationSort The list of sort collations. |
| * @return The direction of the first sort field. |
| */ |
| def getFirstSortDirection(collationSort: RelCollation): Direction = { |
| collationSort.getFieldCollations.get(0).direction |
| } |
| |
| /** |
| * Returns the first sort field. |
| * |
| * @param collationSort The list of sort collations. |
| * @param rowType The row type of the input. |
| * @return The first sort field. |
| */ |
| def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = { |
| val idx = collationSort.getFieldCollations.get(0).getFieldIndex |
| rowType.getFieldList.get(idx) |
| } |
| |
| /** Returns the default null direction if not specified. */ |
| def getNullDefaultOrders(ascendings: Array[Boolean]): Array[Boolean] = { |
| ascendings.map { asc => |
| FlinkPlannerImpl.defaultNullCollation.last(!asc) |
| } |
| } |
| |
| def getKeysAndOrders( |
| fieldCollations: Seq[RelFieldCollation]): (Array[Int], Array[Boolean], Array[Boolean]) = { |
| val fieldMappingDirections = fieldCollations.map(c => |
| (c.getFieldIndex, directionToOrder(c.getDirection))) |
| val keys = fieldMappingDirections.map(_._1) |
| val orders = fieldMappingDirections.map(_._2 == Order.ASCENDING) |
| val nullsIsLast = fieldCollations.map(_.nullDirection).map { |
| case RelFieldCollation.NullDirection.LAST => true |
| case RelFieldCollation.NullDirection.FIRST => false |
| case RelFieldCollation.NullDirection.UNSPECIFIED => |
| throw new TableException(s"Do not support UNSPECIFIED for null order.") |
| }.toArray |
| |
| deduplicateSortKeys(keys.toArray, orders.toArray, nullsIsLast) |
| } |
| |
| def deduplicateSortKeys( |
| keys: Array[Int], |
| orders: Array[Boolean], |
| nullsIsLast: Array[Boolean]): (Array[Int], Array[Boolean], Array[Boolean]) = { |
| val keySet = new mutable.HashSet[Int] |
| val keyBuffer = new mutable.ArrayBuffer[Int] |
| val orderBuffer = new mutable.ArrayBuffer[Boolean] |
| val nullsIsLastBuffer = new mutable.ArrayBuffer[Boolean] |
| for (i <- keys.indices) { |
| if (keySet.add(keys(i))) { |
| keyBuffer += keys(i) |
| orderBuffer += orders(i) |
| nullsIsLastBuffer += nullsIsLast(i) |
| } |
| } |
| (keyBuffer.toArray, orderBuffer.toArray, nullsIsLastBuffer.toArray) |
| } |
| |
| def directionToOrder(direction: Direction): Order = { |
| direction match { |
| case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING |
| case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING |
| case _ => throw new IllegalArgumentException("Unsupported direction.") |
| } |
| } |
| |
| def offsetToString(offset: RexNode): String = { |
| val offsetToString = s"$offset" |
| offsetToString |
| } |
| |
| def sortFieldsToString( |
| collationSort: RelCollation, |
| rowRelDataType: RelDataType): String = { |
| val fieldCollations = collationSort.getFieldCollations.asScala |
| .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) |
| |
| fieldCollations |
| .map(col => s"${rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" ) |
| .mkString(", ") |
| } |
| |
| def fetchToString(fetch: RexNode, offset: RexNode): String = { |
| val limitEnd = getFetchLimitEnd(fetch, offset) |
| if (limitEnd == Long.MaxValue) { |
| "unlimited" |
| } else { |
| s"$limitEnd" |
| } |
| } |
| |
| def getFetchLimitEnd(fetch: RexNode, offset: RexNode): Long = { |
| if (fetch != null) { |
| RexLiteral.intValue(fetch) + getFetchLimitStart(offset) |
| } else { |
| Long.MaxValue |
| } |
| } |
| |
| def getFetchLimitStart(offset: RexNode): Long = { |
| if (offset != null) { |
| RexLiteral.intValue(offset) |
| } else { |
| 0L |
| } |
| } |
| |
| def sortToString( |
| rowRelDataType: RelDataType, |
| sortCollation: RelCollation, |
| sortOffset: RexNode, |
| sortFetch: RexNode): String = { |
| s"Sort(by: ($$sortFieldsToString(sortCollation, rowRelDataType))," + |
| (if (sortOffset != null) { |
| " offset: $offsetToString(sortOffset)," |
| } else { |
| "" |
| }) + |
| (if (sortFetch != null) { |
| " fetch: $fetchToString(sortFetch, sortOffset))" |
| } else { |
| "" |
| }) |
| } |
| |
| def sortExplainTerms( |
| pw: RelWriter, |
| rowRelDataType: RelDataType, |
| sortCollation: RelCollation, |
| sortOffset: RexNode, |
| sortFetch: RexNode): RelWriter = { |
| pw.item("orderBy", sortFieldsToString(sortCollation, rowRelDataType)) |
| .itemIf("offset", offsetToString(sortOffset), sortOffset != null) |
| .itemIf("fetch", fetchToString(sortFetch, sortOffset), sortFetch != null) |
| } |
| |
| def calcNeedMemoryForSort(mq: RelMetadataQuery, input: RelNode): Double = { |
| //TODO It's hard to make sure that the normalized key's length is accurate in optimized stage. |
| val normalizedKeyBytes = SortCodeGenerator.MAX_NORMALIZED_KEY_LEN |
| val rowCount = mq.getRowCount(input) |
| val averageRowSize = BatchPhysicalRel.binaryRowAverageSize(input) |
| val recordAreaInBytes = rowCount * (averageRowSize + BinaryRowSerializer.LENGTH_SIZE_IN_BYTES) |
| val indexAreaInBytes = rowCount * (normalizedKeyBytes + BinaryIndexedSortable.OFFSET_LEN) |
| recordAreaInBytes + indexAreaInBytes |
| } |
| } |