| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg; |
| |
| import java.io.IOException; |
| import org.apache.iceberg.avro.Avro; |
| import org.apache.iceberg.exceptions.RuntimeIOException; |
| import org.apache.iceberg.io.FileAppender; |
| import org.apache.iceberg.io.OutputFile; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| |
| /** |
| * Writer for manifest files. |
| * |
| * @param <F> Java class of files written to the manifest, either {@link DataFile} or {@link DeleteFile}. |
| */ |
| public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAppender<F> { |
| // stand-in for the current sequence number that will be assigned when the commit is successful |
| // this is replaced when writing a manifest list by the ManifestFile wrapper |
| static final long UNASSIGNED_SEQ = -1L; |
| |
| private final OutputFile file; |
| private final int specId; |
| private final FileAppender<ManifestEntry<F>> writer; |
| private final Long snapshotId; |
| private final GenericManifestEntry<F> reused; |
| private final PartitionSummary stats; |
| |
| private boolean closed = false; |
| private int addedFiles = 0; |
| private long addedRows = 0L; |
| private int existingFiles = 0; |
| private long existingRows = 0L; |
| private int deletedFiles = 0; |
| private long deletedRows = 0L; |
| private Long minSequenceNumber = null; |
| |
| private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { |
| this.file = file; |
| this.specId = spec.specId(); |
| this.writer = newAppender(spec, file); |
| this.snapshotId = snapshotId; |
| this.reused = new GenericManifestEntry<>(spec.partitionType()); |
| this.stats = new PartitionSummary(spec); |
| } |
| |
| protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry); |
| |
| protected abstract FileAppender<ManifestEntry<F>> newAppender(PartitionSpec spec, OutputFile outputFile); |
| |
| protected ManifestContent content() { |
| return ManifestContent.DATA; |
| } |
| |
| void addEntry(ManifestEntry<F> entry) { |
| switch (entry.status()) { |
| case ADDED: |
| addedFiles += 1; |
| addedRows += entry.file().recordCount(); |
| break; |
| case EXISTING: |
| existingFiles += 1; |
| existingRows += entry.file().recordCount(); |
| break; |
| case DELETED: |
| deletedFiles += 1; |
| deletedRows += entry.file().recordCount(); |
| break; |
| } |
| stats.update(entry.file().partition()); |
| if (entry.sequenceNumber() != null && (minSequenceNumber == null || entry.sequenceNumber() < minSequenceNumber)) { |
| this.minSequenceNumber = entry.sequenceNumber(); |
| } |
| writer.add(prepare(entry)); |
| } |
| |
| /** |
| * Add an added entry for a file. |
| * <p> |
| * The entry's snapshot ID will be this manifest's snapshot ID. |
| * |
| * @param addedFile a data file |
| */ |
| @Override |
| public void add(F addedFile) { |
| addEntry(reused.wrapAppend(snapshotId, addedFile)); |
| } |
| |
| void add(ManifestEntry<F> entry) { |
| addEntry(reused.wrapAppend(snapshotId, entry.file())); |
| } |
| |
| /** |
| * Add an existing entry for a file. |
| * |
| * @param existingFile a file |
| * @param fileSnapshotId snapshot ID when the data file was added to the table |
| * @param sequenceNumber sequence number for the data file |
| */ |
| public void existing(F existingFile, long fileSnapshotId, long sequenceNumber) { |
| addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile)); |
| } |
| |
| void existing(ManifestEntry<F> entry) { |
| addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file())); |
| } |
| |
| /** |
| * Add a delete entry for a file. |
| * <p> |
| * The entry's snapshot ID will be this manifest's snapshot ID. |
| * |
| * @param deletedFile a file |
| */ |
| public void delete(F deletedFile) { |
| addEntry(reused.wrapDelete(snapshotId, deletedFile)); |
| } |
| |
| void delete(ManifestEntry<F> entry) { |
| // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk |
| // when this Snapshot has been removed or when there are no Snapshots older than this one. |
| addEntry(reused.wrapDelete(snapshotId, entry.file())); |
| } |
| |
| @Override |
| public Metrics metrics() { |
| return writer.metrics(); |
| } |
| |
| @Override |
| public long length() { |
| return writer.length(); |
| } |
| |
| public ManifestFile toManifestFile() { |
| Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); |
| // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min |
| // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it. |
| long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; |
| return new GenericManifestFile(file.location(), writer.length(), specId, content(), |
| UNASSIGNED_SEQ, minSeqNumber, snapshotId, |
| addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.closed = true; |
| writer.close(); |
| } |
| |
| static class V2Writer extends ManifestWriter<DataFile> { |
| private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper; |
| |
| V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { |
| super(spec, file, snapshotId); |
| this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); |
| } |
| |
| @Override |
| protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) { |
| return entryWrapper.wrap(entry); |
| } |
| |
| @Override |
| protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) { |
| Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); |
| try { |
| return Avro.write(file) |
| .schema(manifestSchema) |
| .named("manifest_entry") |
| .meta("schema", SchemaParser.toJson(spec.schema())) |
| .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) |
| .meta("partition-spec-id", String.valueOf(spec.specId())) |
| .meta("format-version", "2") |
| .meta("content", "data") |
| .overwrite() |
| .build(); |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); |
| } |
| } |
| } |
| |
| static class V2DeleteWriter extends ManifestWriter<DeleteFile> { |
| private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper; |
| |
| V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { |
| super(spec, file, snapshotId); |
| this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); |
| } |
| |
| @Override |
| protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) { |
| return entryWrapper.wrap(entry); |
| } |
| |
| @Override |
| protected FileAppender<ManifestEntry<DeleteFile>> newAppender(PartitionSpec spec, OutputFile file) { |
| Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); |
| try { |
| return Avro.write(file) |
| .schema(manifestSchema) |
| .named("manifest_entry") |
| .meta("schema", SchemaParser.toJson(spec.schema())) |
| .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) |
| .meta("partition-spec-id", String.valueOf(spec.specId())) |
| .meta("format-version", "2") |
| .meta("content", "deletes") |
| .overwrite() |
| .build(); |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); |
| } |
| } |
| |
| @Override |
| protected ManifestContent content() { |
| return ManifestContent.DELETES; |
| } |
| } |
| |
| static class V1Writer extends ManifestWriter<DataFile> { |
| private final V1Metadata.IndexedManifestEntry entryWrapper; |
| |
| V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { |
| super(spec, file, snapshotId); |
| this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); |
| } |
| |
| @Override |
| protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) { |
| return entryWrapper.wrap(entry); |
| } |
| |
| @Override |
| protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) { |
| Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); |
| try { |
| return Avro.write(file) |
| .schema(manifestSchema) |
| .named("manifest_entry") |
| .meta("schema", SchemaParser.toJson(spec.schema())) |
| .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) |
| .meta("partition-spec-id", String.valueOf(spec.specId())) |
| .meta("format-version", "1") |
| .overwrite() |
| .build(); |
| } catch (IOException e) { |
| throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); |
| } |
| } |
| } |
| } |