blob: 26d2b4837b4b62313ac697217c59b493acc47ebe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteStrategy;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
import org.apache.iceberg.util.ZOrderByteUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Spark strategy to zOrder data.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkZOrderDataRewriter} instead.
*/
@Deprecated
public class SparkZOrderStrategy extends SparkSortStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderStrategy.class);
private static final String Z_COLUMN = "ICEZVALUE";
private static final Schema Z_SCHEMA =
new Schema(NestedField.required(0, Z_COLUMN, Types.BinaryType.get()));
private static final org.apache.iceberg.SortOrder Z_SORT_ORDER =
org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA)
.sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
.build();
/**
* Controls the amount of bytes interleaved in the ZOrder Algorithm. Default is all bytes being
* interleaved.
*/
private static final String MAX_OUTPUT_SIZE_KEY = "max-output-size";
private static final int DEFAULT_MAX_OUTPUT_SIZE = Integer.MAX_VALUE;
/**
* Controls the number of bytes considered from an input column of a type with variable length
* (String, Binary). Default is to use the same size as primitives {@link
* ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE}
*/
private static final String VAR_LENGTH_CONTRIBUTION_KEY = "var-length-contribution";
private static final int DEFAULT_VAR_LENGTH_CONTRIBUTION = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
private final List<String> zOrderColNames;
private int maxOutputSize;
private int varLengthContribution;
@Override
public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(VAR_LENGTH_CONTRIBUTION_KEY)
.add(MAX_OUTPUT_SIZE_KEY)
.build();
}
@Override
public RewriteStrategy options(Map<String, String> options) {
super.options(options);
varLengthContribution =
PropertyUtil.propertyAsInt(
options, VAR_LENGTH_CONTRIBUTION_KEY, DEFAULT_VAR_LENGTH_CONTRIBUTION);
Preconditions.checkArgument(
varLengthContribution > 0,
"Cannot use less than 1 byte for variable length types with zOrder, %s was set to %s",
VAR_LENGTH_CONTRIBUTION_KEY,
varLengthContribution);
maxOutputSize =
PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE_KEY, DEFAULT_MAX_OUTPUT_SIZE);
Preconditions.checkArgument(
maxOutputSize > 0,
"Cannot have the interleaved ZOrder value use less than 1 byte, %s was set to %s",
MAX_OUTPUT_SIZE_KEY,
maxOutputSize);
return this;
}
public SparkZOrderStrategy(Table table, SparkSession spark, List<String> zOrderColNames) {
super(table, spark);
Preconditions.checkArgument(
zOrderColNames != null && !zOrderColNames.isEmpty(),
"Cannot ZOrder when no columns are specified");
Stream<String> identityPartitionColumns =
table.spec().fields().stream()
.filter(f -> f.transform().isIdentity())
.map(PartitionField::name);
List<String> partZOrderCols =
identityPartitionColumns.filter(zOrderColNames::contains).collect(Collectors.toList());
if (!partZOrderCols.isEmpty()) {
LOG.warn(
"Cannot ZOrder on an Identity partition column as these values are constant within a partition "
+ "and will be removed from the ZOrder expression: {}",
partZOrderCols);
zOrderColNames.removeAll(partZOrderCols);
Preconditions.checkArgument(
!zOrderColNames.isEmpty(),
"Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used.");
}
validateColumnsExistence(table, spark, zOrderColNames);
this.zOrderColNames = zOrderColNames;
}
private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) {
boolean caseSensitive = SparkUtil.caseSensitive(spark);
Schema schema = table.schema();
colNames.forEach(
col -> {
NestedField nestedField =
caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col);
if (nestedField == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find column '%s' in table schema: %s", col, schema.asStruct()));
}
});
}
@Override
public String name() {
return "Z-ORDER";
}
@Override
protected void validateOptions() {
// Ignore SortStrategy validation
return;
}
@Override
public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
SparkZOrderUDF zOrderUDF =
new SparkZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize);
String groupID = UUID.randomUUID().toString();
boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table().spec());
SortOrder[] ordering;
if (requiresRepartition) {
ordering =
SparkDistributionAndOrderingUtil.convert(
SortOrderUtil.buildSortOrder(table(), sortOrder()));
} else {
ordering = SparkDistributionAndOrderingUtil.convert(sortOrder());
}
Distribution distribution = Distributions.ordered(ordering);
try {
tableCache().add(groupID, table());
manager().stageTasks(table(), groupID, filesToRewrite);
// spark session from parent
SparkSession spark = spark();
// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
.load(groupID);
Column[] originalColumns =
Arrays.stream(scanDF.schema().names()).map(n -> functions.col(n)).toArray(Column[]::new);
List<StructField> zOrderColumns =
zOrderColNames.stream().map(scanDF.schema()::apply).collect(Collectors.toList());
Column zvalueArray =
functions.array(
zOrderColumns.stream()
.map(
colStruct ->
zOrderUDF.sortedLexicographically(
functions.col(colStruct.name()), colStruct.dataType()))
.toArray(Column[]::new));
Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));
SQLConf sqlConf = spark.sessionState().conf();
LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, zvalueDF.encoder());
sortedDf
.select(originalColumns)
.write()
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.mode("append")
.save(groupID);
return rewriteCoordinator().fetchNewDataFiles(table(), groupID);
} finally {
tableCache().remove(groupID);
manager().removeTasks(table(), groupID);
rewriteCoordinator().clearRewrite(table(), groupID);
}
}
@Override
protected org.apache.iceberg.SortOrder sortOrder() {
return Z_SORT_ORDER;
}
}