blob: 697ecc43bbefb19192f3a6605a0a1540100a0108 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.BoundTerm;
import org.apache.iceberg.expressions.BoundTransform;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.util.Pair;
class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
private final TableOperations ops;
private final TableMetadata base;
private final int formatVersion;
private final PartitionSpec spec;
private final Schema schema;
private final Map<String, PartitionField> nameToField;
private final Map<Pair<Integer, String>, PartitionField> transformToField;
private final List<PartitionField> adds = Lists.newArrayList();
private final Map<Integer, PartitionField> addedTimeFields = Maps.newHashMap();
private final Map<Pair<Integer, String>, PartitionField> transformToAddedField = Maps.newHashMap();
private final Map<String, PartitionField> nameToAddedField = Maps.newHashMap();
private final Set<Object> deletes = Sets.newHashSet();
private final Map<String, String> renames = Maps.newHashMap();
private boolean caseSensitive;
private int lastAssignedPartitionId;
BaseUpdatePartitionSpec(TableOperations ops) {
this.ops = ops;
this.caseSensitive = true;
this.base = ops.current();
this.formatVersion = base.formatVersion();
this.spec = base.spec();
this.schema = spec.schema();
this.nameToField = indexSpecByName(spec);
this.transformToField = indexSpecByTransform(spec);
this.lastAssignedPartitionId =
base.specs().stream().mapToInt(PartitionSpec::lastAssignedFieldId).max().orElse(999);
spec.fields().stream()
.filter(field -> field.transform() instanceof UnknownTransform)
.findAny()
.ifPresent(field -> {
throw new IllegalArgumentException("Cannot update partition spec with unknown transform: " + field);
});
}
/**
* For testing only.
*/
@VisibleForTesting
BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec) {
this(formatVersion, spec, spec.fields().stream().mapToInt(PartitionField::fieldId).max().orElse(999));
}
/**
* For testing only.
*/
@VisibleForTesting
BaseUpdatePartitionSpec(int formatVersion, PartitionSpec spec, int lastAssignedPartitionId) {
this.ops = null;
this.base = null;
this.formatVersion = formatVersion;
this.caseSensitive = true;
this.spec = spec;
this.schema = spec.schema();
this.nameToField = indexSpecByName(spec);
this.transformToField = indexSpecByTransform(spec);
this.lastAssignedPartitionId = lastAssignedPartitionId;
}
private int assignFieldId() {
this.lastAssignedPartitionId += 1;
return lastAssignedPartitionId;
}
@Override
public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
return this;
}
@Override
public BaseUpdatePartitionSpec addField(String sourceName) {
return addField(Expressions.ref(sourceName));
}
@Override
public BaseUpdatePartitionSpec addField(Term term) {
return addField(null, term);
}
@Override
public BaseUpdatePartitionSpec addField(String name, Term term) {
PartitionField alreadyAdded = nameToAddedField.get(name);
Preconditions.checkArgument(alreadyAdded == null, "Cannot add duplicate partition field: %s", alreadyAdded);
Pair<Integer, Transform<?, ?>> sourceTransform = resolve(term);
Pair<Integer, String> validationKey = Pair.of(sourceTransform.first(), sourceTransform.second().toString());
PartitionField existing = transformToField.get(validationKey);
Preconditions.checkArgument(existing == null,
"Cannot add duplicate partition field %s=%s, conflicts with %s", name, term, existing);
PartitionField added = transformToAddedField.get(validationKey);
Preconditions.checkArgument(added == null,
"Cannot add duplicate partition field %s=%s, already added: %s", name, term, added);
PartitionField newField = new PartitionField(
sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
checkForRedundantAddedPartitions(newField);
transformToAddedField.put(validationKey, newField);
if (name != null) {
nameToAddedField.put(name, newField);
}
adds.add(newField);
return this;
}
@Override
public BaseUpdatePartitionSpec removeField(String name) {
PartitionField alreadyAdded = nameToAddedField.get(name);
Preconditions.checkArgument(alreadyAdded == null, "Cannot delete newly added field: %s", alreadyAdded);
Preconditions.checkArgument(renames.get(name) == null,
"Cannot rename and delete partition field: %s", name);
PartitionField field = nameToField.get(name);
Preconditions.checkArgument(field != null,
"Cannot find partition field to remove: %s", name);
deletes.add(field.fieldId());
return this;
}
@Override
public BaseUpdatePartitionSpec removeField(Term term) {
Pair<Integer, Transform<?, ?>> sourceTransform = resolve(term);
Pair<Integer, String> key = Pair.of(sourceTransform.first(), sourceTransform.second().toString());
PartitionField added = transformToAddedField.get(key);
Preconditions.checkArgument(added == null, "Cannot delete newly added field: %s", added);
PartitionField field = transformToField.get(key);
Preconditions.checkArgument(field != null,
"Cannot find partition field to remove: %s", term);
Preconditions.checkArgument(renames.get(field.name()) == null,
"Cannot rename and delete partition field: %s", field.name());
deletes.add(field.fieldId());
return this;
}
@Override
public BaseUpdatePartitionSpec renameField(String name, String newName) {
PartitionField added = nameToAddedField.get(name);
Preconditions.checkArgument(added == null,
"Cannot rename newly added partition field: %s", name);
PartitionField field = nameToField.get(name);
Preconditions.checkArgument(field != null,
"Cannot find partition field to rename: %s", name);
Preconditions.checkArgument(!deletes.contains(field.fieldId()),
"Cannot delete and rename partition field: %s", name);
renames.put(name, newName);
return this;
}
@Override
public PartitionSpec apply() {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (PartitionField field : spec.fields()) {
if (!deletes.contains(field.fieldId())) {
String newName = renames.get(field.name());
if (newName != null) {
builder.add(field.sourceId(), field.fieldId(), newName, field.transform());
} else {
builder.add(field.sourceId(), field.fieldId(), field.name(), field.transform());
}
} else if (formatVersion < 2) {
// field IDs were not required for v1 and were assigned sequentially in each partition spec starting at 1,000.
// to maintain consistent field ids across partition specs in v1 tables, any partition field that is removed
// must be replaced with a null transform. null values are always allowed in partition data.
builder.add(field.sourceId(), field.fieldId(), field.name(), Transforms.alwaysNull());
}
}
for (PartitionField newField : adds) {
String partitionName;
if (newField.name() != null) {
partitionName = newField.name();
} else {
partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
}
builder.add(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
}
return builder.build();
}
@Override
public void commit() {
TableMetadata update = base.updatePartitionSpec(apply());
ops.commit(base, update);
}
private Pair<Integer, Transform<?, ?>> resolve(Term term) {
Preconditions.checkArgument(term instanceof UnboundTerm, "Term must be unbound");
BoundTerm<?> boundTerm = ((UnboundTerm<?>) term).bind(schema.asStruct(), caseSensitive);
int sourceId = boundTerm.ref().fieldId();
Transform<?, ?> transform = toTransform(boundTerm);
return Pair.of(sourceId, transform);
}
private Transform<?, ?> toTransform(BoundTerm<?> term) {
if (term instanceof BoundReference) {
return Transforms.identity(term.type());
} else if (term instanceof BoundTransform) {
return ((BoundTransform<?, ?>) term).transform();
} else {
throw new ValidationException("Invalid term: %s, expected either a bound reference or transform", term);
}
}
private void checkForRedundantAddedPartitions(PartitionField field) {
if (isTimeTransform(field)) {
PartitionField timeField = addedTimeFields.get(field.sourceId());
Preconditions.checkArgument(timeField == null,
"Cannot add redundant partition field: %s conflicts with %s", timeField, field);
addedTimeFields.put(field.sourceId(), field);
}
}
private static Map<String, PartitionField> indexSpecByName(PartitionSpec spec) {
ImmutableMap.Builder<String, PartitionField> builder = ImmutableMap.builder();
List<PartitionField> fields = spec.fields();
for (PartitionField field : fields) {
builder.put(field.name(), field);
}
return builder.build();
}
private static Map<Pair<Integer, String>, PartitionField> indexSpecByTransform(PartitionSpec spec) {
ImmutableMap.Builder<Pair<Integer, String>, PartitionField> builder = ImmutableMap.builder();
List<PartitionField> fields = spec.fields();
for (PartitionField field : fields) {
builder.put(Pair.of(field.sourceId(), field.transform().toString()), field);
}
return builder.build();
}
private boolean isTimeTransform(PartitionField field) {
return PartitionSpecVisitor.visit(schema, field, IsTimeTransform.INSTANCE);
}
private static class IsTimeTransform implements PartitionSpecVisitor<Boolean> {
private static final IsTimeTransform INSTANCE = new IsTimeTransform();
private IsTimeTransform() {
}
@Override
public Boolean identity(int fieldId, String sourceName, int sourceId) {
return false;
}
@Override
public Boolean bucket(int fieldId, String sourceName, int sourceId, int numBuckets) {
return false;
}
@Override
public Boolean truncate(int fieldId, String sourceName, int sourceId, int width) {
return false;
}
@Override
public Boolean year(int fieldId, String sourceName, int sourceId) {
return true;
}
@Override
public Boolean month(int fieldId, String sourceName, int sourceId) {
return true;
}
@Override
public Boolean day(int fieldId, String sourceName, int sourceId) {
return true;
}
@Override
public Boolean hour(int fieldId, String sourceName, int sourceId) {
return true;
}
@Override
public Boolean alwaysNull(int fieldId, String sourceName, int sourceId) {
return false;
}
@Override
public Boolean unknown(int fieldId, String sourceName, int sourceId, String transform) {
return false;
}
}
private static class PartitionNameGenerator implements PartitionSpecVisitor<String> {
private static final PartitionNameGenerator INSTANCE = new PartitionNameGenerator();
private PartitionNameGenerator() {
}
@Override
public String identity(int fieldId, String sourceName, int sourceId) {
return sourceName;
}
@Override
public String bucket(int fieldId, String sourceName, int sourceId, int numBuckets) {
return sourceName + "_bucket_" + numBuckets;
}
@Override
public String truncate(int fieldId, String sourceName, int sourceId, int width) {
return sourceName + "_trunc_" + width;
}
@Override
public String year(int fieldId, String sourceName, int sourceId) {
return sourceName + "_year";
}
@Override
public String month(int fieldId, String sourceName, int sourceId) {
return sourceName + "_month";
}
@Override
public String day(int fieldId, String sourceName, int sourceId) {
return sourceName + "_day";
}
@Override
public String hour(int fieldId, String sourceName, int sourceId) {
return sourceName + "_hour";
}
@Override
public String alwaysNull(int fieldId, String sourceName, int sourceId) {
return sourceName + "_null";
}
}
}