blob: a468632f415949d5ed3055d7ce56e33f80bf2270 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import static org.apache.iceberg.types.Types.NestedField.required;
class V2Metadata {
private V2Metadata() {
}
static final Schema MANIFEST_LIST_SCHEMA = new Schema(
ManifestFile.PATH,
ManifestFile.LENGTH,
ManifestFile.SPEC_ID,
ManifestFile.MANIFEST_CONTENT.asRequired(),
ManifestFile.SEQUENCE_NUMBER.asRequired(),
ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
ManifestFile.SNAPSHOT_ID.asRequired(),
ManifestFile.ADDED_FILES_COUNT.asRequired(),
ManifestFile.EXISTING_FILES_COUNT.asRequired(),
ManifestFile.DELETED_FILES_COUNT.asRequired(),
ManifestFile.ADDED_ROWS_COUNT.asRequired(),
ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
ManifestFile.DELETED_ROWS_COUNT.asRequired(),
ManifestFile.PARTITION_SUMMARIES
);
/**
* A wrapper class to write any ManifestFile implementation to Avro using the v2 write schema.
*
* This is used to maintain compatibility with v2 by writing manifest list files with the old schema, instead of
* writing a sequence number into metadata files in v2 tables.
*/
static class IndexedManifestFile implements ManifestFile, IndexedRecord {
private static final org.apache.avro.Schema AVRO_SCHEMA =
AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file");
private final long commitSnapshotId;
private final long sequenceNumber;
private ManifestFile wrapped = null;
IndexedManifestFile(long commitSnapshotId, long sequenceNumber) {
this.commitSnapshotId = commitSnapshotId;
this.sequenceNumber = sequenceNumber;
}
public ManifestFile wrap(ManifestFile file) {
this.wrapped = file;
return this;
}
@Override
public org.apache.avro.Schema getSchema() {
return AVRO_SCHEMA;
}
@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot read using IndexedManifestFile");
}
@Override
public Object get(int pos) {
switch (pos) {
case 0:
return wrapped.path();
case 1:
return wrapped.length();
case 2:
return wrapped.partitionSpecId();
case 3:
return wrapped.content().id();
case 4:
if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// if the sequence number is being assigned here, then the manifest must be created by the current
// operation. to validate this, check that the snapshot id matches the current commit
Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
"Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
return sequenceNumber;
} else {
return wrapped.sequenceNumber();
}
case 5:
if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// same sanity check as above
Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
"Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
// if the min sequence number is not determined, then there was no assigned sequence number for any file
// written to the wrapped manifest. replace the unassigned sequence number with the one for this commit
return sequenceNumber;
} else {
return wrapped.minSequenceNumber();
}
case 6:
return wrapped.snapshotId();
case 7:
return wrapped.addedFilesCount();
case 8:
return wrapped.existingFilesCount();
case 9:
return wrapped.deletedFilesCount();
case 10:
return wrapped.addedRowsCount();
case 11:
return wrapped.existingRowsCount();
case 12:
return wrapped.deletedRowsCount();
case 13:
return wrapped.partitions();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
}
@Override
public String path() {
return wrapped.path();
}
@Override
public long length() {
return wrapped.length();
}
@Override
public int partitionSpecId() {
return wrapped.partitionSpecId();
}
@Override
public ManifestContent content() {
return wrapped.content();
}
@Override
public long sequenceNumber() {
return wrapped.sequenceNumber();
}
@Override
public long minSequenceNumber() {
return wrapped.minSequenceNumber();
}
@Override
public Long snapshotId() {
return wrapped.snapshotId();
}
@Override
public boolean hasAddedFiles() {
return wrapped.hasAddedFiles();
}
@Override
public Integer addedFilesCount() {
return wrapped.addedFilesCount();
}
@Override
public Long addedRowsCount() {
return wrapped.addedRowsCount();
}
@Override
public boolean hasExistingFiles() {
return wrapped.hasExistingFiles();
}
@Override
public Integer existingFilesCount() {
return wrapped.existingFilesCount();
}
@Override
public Long existingRowsCount() {
return wrapped.existingRowsCount();
}
@Override
public boolean hasDeletedFiles() {
return wrapped.hasDeletedFiles();
}
@Override
public Integer deletedFilesCount() {
return wrapped.deletedFilesCount();
}
@Override
public Long deletedRowsCount() {
return wrapped.deletedRowsCount();
}
@Override
public List<PartitionFieldSummary> partitions() {
return wrapped.partitions();
}
@Override
public ManifestFile copy() {
return wrapped.copy();
}
}
static Schema entrySchema(Types.StructType partitionType) {
return wrapFileSchema(fileType(partitionType));
}
static Schema wrapFileSchema(Types.StructType fileSchema) {
// this is used to build projection schemas
return new Schema(
ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, ManifestEntry.SEQUENCE_NUMBER,
required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
}
static Types.StructType fileType(Types.StructType partitionType) {
return Types.StructType.of(
DataFile.CONTENT.asRequired(),
DataFile.FILE_PATH,
DataFile.FILE_FORMAT,
required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
DataFile.RECORD_COUNT,
DataFile.FILE_SIZE,
DataFile.COLUMN_SIZES,
DataFile.VALUE_COUNTS,
DataFile.NULL_VALUE_COUNTS,
DataFile.NAN_VALUE_COUNTS,
DataFile.LOWER_BOUNDS,
DataFile.UPPER_BOUNDS,
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS
);
}
static class IndexedManifestEntry<F extends ContentFile<F>> implements ManifestEntry<F>, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final Long commitSnapshotId;
private final IndexedDataFile<?> fileWrapper;
private ManifestEntry<F> wrapped = null;
IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.commitSnapshotId = commitSnapshotId;
this.fileWrapper = new IndexedDataFile<>(partitionType);
}
public IndexedManifestEntry<F> wrap(ManifestEntry<F> entry) {
this.wrapped = entry;
return this;
}
@Override
public org.apache.avro.Schema getSchema() {
return avroSchema;
}
@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot read using IndexedManifestEntry");
}
@Override
public Object get(int i) {
switch (i) {
case 0:
return wrapped.status().id();
case 1:
return wrapped.snapshotId();
case 2:
if (wrapped.sequenceNumber() == null) {
// if the entry's sequence number is null, then it will inherit the sequence number of the current commit.
// to validate that this is correct, check that the snapshot id is either null (will also be inherited) or
// that it matches the id of the current commit.
Preconditions.checkState(
wrapped.snapshotId() == null || wrapped.snapshotId().equals(commitSnapshotId),
"Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId());
return null;
}
return wrapped.sequenceNumber();
case 3:
return fileWrapper.wrap(wrapped.file());
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}
@Override
public Status status() {
return wrapped.status();
}
@Override
public Long snapshotId() {
return wrapped.snapshotId();
}
@Override
public void setSnapshotId(long snapshotId) {
wrapped.setSnapshotId(snapshotId);
}
@Override
public Long sequenceNumber() {
return wrapped.sequenceNumber();
}
@Override
public void setSequenceNumber(long sequenceNumber) {
wrapped.setSequenceNumber(sequenceNumber);
}
@Override
public F file() {
return wrapped.file();
}
@Override
public ManifestEntry<F> copy() {
return wrapped.copy();
}
@Override
public ManifestEntry<F> copyWithoutStats() {
return wrapped.copyWithoutStats();
}
}
/**
* Wrapper used to write DataFile or DeleteFile to v2 metadata.
*/
static class IndexedDataFile<F> implements ContentFile<F>, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final IndexedStructLike partitionWrapper;
private ContentFile<F> wrapped = null;
IndexedDataFile(Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
}
@SuppressWarnings("unchecked")
IndexedDataFile<F> wrap(ContentFile<?> file) {
this.wrapped = (ContentFile<F>) file;
return this;
}
@Override
public org.apache.avro.Schema getSchema() {
return avroSchema;
}
@Override
public Object get(int pos) {
switch (pos) {
case 0:
return wrapped.content().id();
case 1:
return wrapped.path().toString();
case 2:
return wrapped.format() != null ? wrapped.format().toString() : null;
case 3:
return partitionWrapper.wrap(wrapped.partition());
case 4:
return wrapped.recordCount();
case 5:
return wrapped.fileSizeInBytes();
case 6:
return wrapped.columnSizes();
case 7:
return wrapped.valueCounts();
case 8:
return wrapped.nullValueCounts();
case 9:
return wrapped.nanValueCounts();
case 10:
return wrapped.lowerBounds();
case 11:
return wrapped.upperBounds();
case 12:
return wrapped.keyMetadata();
case 13:
return wrapped.splitOffsets();
case 14:
return wrapped.equalityFieldIds();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
}
@Override
public Long pos() {
return null;
}
@Override
public int specId() {
return wrapped.specId();
}
@Override
public FileContent content() {
return wrapped.content();
}
@Override
public CharSequence path() {
return wrapped.path();
}
@Override
public FileFormat format() {
return wrapped.format();
}
@Override
public StructLike partition() {
return wrapped.partition();
}
@Override
public long recordCount() {
return wrapped.recordCount();
}
@Override
public long fileSizeInBytes() {
return wrapped.fileSizeInBytes();
}
@Override
public Map<Integer, Long> columnSizes() {
return wrapped.columnSizes();
}
@Override
public Map<Integer, Long> valueCounts() {
return wrapped.valueCounts();
}
@Override
public Map<Integer, Long> nullValueCounts() {
return wrapped.nullValueCounts();
}
@Override
public Map<Integer, Long> nanValueCounts() {
return wrapped.nanValueCounts();
}
@Override
public Map<Integer, ByteBuffer> lowerBounds() {
return wrapped.lowerBounds();
}
@Override
public Map<Integer, ByteBuffer> upperBounds() {
return wrapped.upperBounds();
}
@Override
public ByteBuffer keyMetadata() {
return wrapped.keyMetadata();
}
@Override
public List<Long> splitOffsets() {
return wrapped.splitOffsets();
}
@Override
public List<Integer> equalityFieldIds() {
return wrapped.equalityFieldIds();
}
@Override
public F copy() {
throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
}
@Override
public F copyWithoutStats() {
throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
}
}
}