blob: 37e0c1fffab07fb41818845ced20a6f7f3090932 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.SortOrderVisitor;
public class SortOrderUtil {
private SortOrderUtil() {}
public static SortOrder buildSortOrder(Table table) {
return buildSortOrder(table.schema(), table.spec(), table.sortOrder());
}
// builds a sort order using both the table partition spec and the user supplied sort order
public static SortOrder buildSortOrder(Table table, SortOrder sortOrder) {
return buildSortOrder(table.schema(), table.spec(), sortOrder);
}
/**
* Build a final sort order that satisfies the clustering required by the partition spec.
*
* <p>The incoming sort order may or may not satisfy the clustering needed by the partition spec.
* This modifies the sort order so that it clusters by partition and still produces the same order
* within each partition.
*
* @param schema a schema
* @param spec a partition spec
* @param sortOrder a sort order
* @return the sort order with additional sort fields to satisfy the clustering required by the
* spec
*/
public static SortOrder buildSortOrder(Schema schema, PartitionSpec spec, SortOrder sortOrder) {
if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
return SortOrder.unsorted();
}
// make a map of the partition fields that need to be included in the clustering produced by the
// sort order
Map<Pair<String, Integer>, PartitionField> requiredClusteringFields =
requiredClusteringFields(spec);
// remove any partition fields that are clustered by the sort order by iterating over a prefix
// in the sort order.
// this will stop when a non-partition field is found, or when the sort field only satisfies the
// partition field.
for (SortField sortField : sortOrder.fields()) {
Pair<String, Integer> sourceAndTransform =
Pair.of(sortField.transform().toString(), sortField.sourceId());
if (requiredClusteringFields.containsKey(sourceAndTransform)) {
requiredClusteringFields.remove(sourceAndTransform);
continue; // keep processing the prefix
}
// if the field satisfies the order of any partition fields, also remove them before stopping
// use a set to avoid concurrent modification
for (PartitionField field : spec.fields()) {
if (sortField.sourceId() == field.sourceId()
&& sortField.transform().satisfiesOrderOf(field.transform())) {
requiredClusteringFields.remove(Pair.of(field.transform().toString(), field.sourceId()));
}
}
break;
}
// build a sort prefix of partition fields that are not already in the sort order's prefix
SortOrder.Builder builder = SortOrder.builderFor(schema);
for (PartitionField field : requiredClusteringFields.values()) {
String sourceName = schema.findColumnName(field.sourceId());
builder.asc(Expressions.transform(sourceName, field.transform()));
}
// add the configured sort to the partition spec prefix sort
SortOrderVisitor.visit(sortOrder, new CopySortOrderFields(builder));
return builder.build();
}
private static Map<Pair<String, Integer>, PartitionField> requiredClusteringFields(
PartitionSpec spec) {
Map<Pair<String, Integer>, PartitionField> requiredClusteringFields = Maps.newLinkedHashMap();
for (PartitionField partField : spec.fields()) {
if (!partField.transform().toString().equals("void")) {
requiredClusteringFields.put(
Pair.of(partField.transform().toString(), partField.sourceId()), partField);
}
}
// remove any partition fields that are satisfied by another partition field, like days(ts) and
// hours(ts)
for (PartitionField partField : spec.fields()) {
for (PartitionField field : spec.fields()) {
if (!partField.equals(field)
&& partField.sourceId() == field.sourceId()
&& partField.transform().satisfiesOrderOf(field.transform())) {
requiredClusteringFields.remove(Pair.of(field.transform().toString(), field.sourceId()));
}
}
}
return requiredClusteringFields;
}
public static Set<String> orderPreservingSortedColumns(SortOrder sortOrder) {
if (sortOrder == null) {
return Collections.emptySet();
} else {
return sortOrder.fields().stream()
.filter(f -> f.transform().preservesOrder())
.map(SortField::sourceId)
.map(sid -> sortOrder.schema().findColumnName(sid))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}
}
}