blob: b03d68ed217e842967940fbe11c496f38a3f6fb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
* Writer for manifest files.
*
* @param <F> Java class of files written to the manifest, either {@link DataFile} or {@link DeleteFile}.
*/
public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAppender<F> {
// stand-in for the current sequence number that will be assigned when the commit is successful
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
private final OutputFile file;
private final int specId;
private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
private final GenericManifestEntry<F> reused;
private final PartitionSummary stats;
private boolean closed = false;
private int addedFiles = 0;
private long addedRows = 0L;
private int existingFiles = 0;
private long existingRows = 0L;
private int deletedFiles = 0;
private long deletedRows = 0L;
private Long minSequenceNumber = null;
private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(spec, file);
this.snapshotId = snapshotId;
this.reused = new GenericManifestEntry<>(spec.partitionType());
this.stats = new PartitionSummary(spec);
}
protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
protected abstract FileAppender<ManifestEntry<F>> newAppender(PartitionSpec spec, OutputFile outputFile);
protected ManifestContent content() {
return ManifestContent.DATA;
}
void addEntry(ManifestEntry<F> entry) {
switch (entry.status()) {
case ADDED:
addedFiles += 1;
addedRows += entry.file().recordCount();
break;
case EXISTING:
existingFiles += 1;
existingRows += entry.file().recordCount();
break;
case DELETED:
deletedFiles += 1;
deletedRows += entry.file().recordCount();
break;
}
stats.update(entry.file().partition());
if (entry.sequenceNumber() != null && (minSequenceNumber == null || entry.sequenceNumber() < minSequenceNumber)) {
this.minSequenceNumber = entry.sequenceNumber();
}
writer.add(prepare(entry));
}
/**
* Add an added entry for a file.
* <p>
* The entry's snapshot ID will be this manifest's snapshot ID.
*
* @param addedFile a data file
*/
@Override
public void add(F addedFile) {
addEntry(reused.wrapAppend(snapshotId, addedFile));
}
void add(ManifestEntry<F> entry) {
addEntry(reused.wrapAppend(snapshotId, entry.file()));
}
/**
* Add an existing entry for a file.
*
* @param existingFile a file
* @param fileSnapshotId snapshot ID when the data file was added to the table
* @param sequenceNumber sequence number for the data file
*/
public void existing(F existingFile, long fileSnapshotId, long sequenceNumber) {
addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile));
}
void existing(ManifestEntry<F> entry) {
addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file()));
}
/**
* Add a delete entry for a file.
* <p>
* The entry's snapshot ID will be this manifest's snapshot ID.
*
* @param deletedFile a file
*/
public void delete(F deletedFile) {
addEntry(reused.wrapDelete(snapshotId, deletedFile));
}
void delete(ManifestEntry<F> entry) {
// Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
// when this Snapshot has been removed or when there are no Snapshots older than this one.
addEntry(reused.wrapDelete(snapshotId, entry.file()));
}
@Override
public Metrics metrics() {
return writer.metrics();
}
@Override
public long length() {
return writer.length();
}
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
// if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min
// sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it.
long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
return new GenericManifestFile(file.location(), writer.length(), specId, content(),
UNASSIGNED_SEQ, minSeqNumber, snapshotId,
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
}
@Override
public void close() throws IOException {
this.closed = true;
writer.close();
}
static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;
V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}
@Override
protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
return entryWrapper.wrap(entry);
}
@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "data")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file);
}
}
}
static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;
V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}
@Override
protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {
return entryWrapper.wrap(entry);
}
@Override
protected FileAppender<ManifestEntry<DeleteFile>> newAppender(PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
.meta("content", "deletes")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file);
}
}
@Override
protected ManifestContent content() {
return ManifestContent.DELETES;
}
}
static class V1Writer extends ManifestWriter<DataFile> {
private final V1Metadata.IndexedManifestEntry entryWrapper;
V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType());
}
@Override
protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
return entryWrapper.wrap(entry);
}
@Override
protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "1")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file);
}
}
}
}