blob: 58805a320415160409f0edf3cfb680a151fc8bf6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
public class TableMetadataParser {
public enum Codec {
NONE(""),
GZIP(".gz");
private final String extension;
Codec(String extension) {
this.extension = extension;
}
public static Codec fromName(String codecName) {
Preconditions.checkArgument(codecName != null, "Codec name is null");
return Codec.valueOf(codecName.toUpperCase(Locale.ENGLISH));
}
public static Codec fromFileName(String fileName) {
Preconditions.checkArgument(fileName.contains(".metadata.json"),
"%s is not a valid metadata file", fileName);
// we have to be backward-compatible with .metadata.json.gz files
if (fileName.endsWith(".metadata.json.gz")) {
return Codec.GZIP;
}
String fileNameWithoutSuffix = fileName.substring(0, fileName.lastIndexOf(".metadata.json"));
if (fileNameWithoutSuffix.endsWith(Codec.GZIP.extension)) {
return Codec.GZIP;
} else {
return Codec.NONE;
}
}
}
private TableMetadataParser() {
}
// visible for testing
static final String FORMAT_VERSION = "format-version";
static final String TABLE_UUID = "table-uuid";
static final String LOCATION = "location";
static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
static final String LAST_UPDATED_MILLIS = "last-updated-ms";
static final String LAST_COLUMN_ID = "last-column-id";
static final String SCHEMA = "schema";
static final String PARTITION_SPEC = "partition-spec";
static final String PARTITION_SPECS = "partition-specs";
static final String DEFAULT_SPEC_ID = "default-spec-id";
static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
static final String SORT_ORDERS = "sort-orders";
static final String PROPERTIES = "properties";
static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
static final String SNAPSHOTS = "snapshots";
static final String SNAPSHOT_ID = "snapshot-id";
static final String TIMESTAMP_MS = "timestamp-ms";
static final String SNAPSHOT_LOG = "snapshot-log";
static final String METADATA_FILE = "metadata-file";
static final String METADATA_LOG = "metadata-log";
public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, true);
}
public static void write(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, false);
}
public static void internalWrite(
TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStream ou = isGzip ? new GZIPOutputStream(stream) : stream;
OutputStreamWriter writer = new OutputStreamWriter(ou, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile);
}
}
public static String getFileExtension(String codecName) {
return getFileExtension(Codec.fromName(codecName));
}
public static String getFileExtension(Codec codec) {
return codec.extension + ".metadata.json";
}
public static String getOldFileExtension(Codec codec) {
// we have to be backward-compatible with .metadata.json.gz files
return ".metadata.json" + codec.extension;
}
public static String toJson(TableMetadata metadata) {
try (StringWriter writer = new StringWriter()) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
toJson(metadata, generator);
generator.flush();
return writer.toString();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
}
}
private static void toJson(TableMetadata metadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();
generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
generator.writeStringField(TABLE_UUID, metadata.uuid());
generator.writeStringField(LOCATION, metadata.location());
if (metadata.formatVersion() > 1) {
generator.writeNumberField(LAST_SEQUENCE_NUMBER, metadata.lastSequenceNumber());
}
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
generator.writeFieldName(SCHEMA);
SchemaParser.toJson(metadata.schema(), generator);
// for older readers, continue writing the default spec as "partition-spec"
if (metadata.formatVersion() == 1) {
generator.writeFieldName(PARTITION_SPEC);
PartitionSpecParser.toJsonFields(metadata.spec(), generator);
}
// write the default spec ID and spec list
generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
generator.writeArrayFieldStart(PARTITION_SPECS);
for (PartitionSpec spec : metadata.specs()) {
PartitionSpecParser.toJson(spec, generator);
}
generator.writeEndArray();
generator.writeNumberField(DEFAULT_SORT_ORDER_ID, metadata.defaultSortOrderId());
generator.writeArrayFieldStart(SORT_ORDERS);
for (SortOrder sortOrder : metadata.sortOrders()) {
SortOrderParser.toJson(sortOrder, generator);
}
generator.writeEndArray();
generator.writeObjectFieldStart(PROPERTIES);
for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
generator.writeStringField(keyValue.getKey(), keyValue.getValue());
}
generator.writeEndObject();
generator.writeNumberField(CURRENT_SNAPSHOT_ID,
metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
generator.writeArrayFieldStart(SNAPSHOTS);
for (Snapshot snapshot : metadata.snapshots()) {
SnapshotParser.toJson(snapshot, generator);
}
generator.writeEndArray();
generator.writeArrayFieldStart(SNAPSHOT_LOG);
for (HistoryEntry logEntry : metadata.snapshotLog()) {
generator.writeStartObject();
generator.writeNumberField(TIMESTAMP_MS, logEntry.timestampMillis());
generator.writeNumberField(SNAPSHOT_ID, logEntry.snapshotId());
generator.writeEndObject();
}
generator.writeEndArray();
generator.writeArrayFieldStart(METADATA_LOG);
for (MetadataLogEntry logEntry : metadata.previousFiles()) {
generator.writeStartObject();
generator.writeNumberField(TIMESTAMP_MS, logEntry.timestampMillis());
generator.writeStringField(METADATA_FILE, logEntry.file());
generator.writeEndObject();
}
generator.writeEndArray();
generator.writeEndObject();
}
/**
* @deprecated will be removed in 0.9.0; use read(FileIO, InputFile) instead.
*/
@Deprecated
public static TableMetadata read(TableOperations ops, InputFile file) {
return read(ops.io(), file);
}
public static TableMetadata read(FileIO io, String path) {
return read(io, io.newInputFile(path));
}
public static TableMetadata read(FileIO io, InputFile file) {
Codec codec = Codec.fromFileName(file.location());
try (InputStream is = codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) {
return fromJson(io, file, JsonUtil.mapper().readValue(is, JsonNode.class));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read file: %s", file);
}
}
static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
Preconditions.checkArgument(node.isObject(),
"Cannot parse metadata from a non-object: %s", node);
int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
Preconditions.checkArgument(formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Cannot read unsupported version %s", formatVersion);
String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
String location = JsonUtil.getString(LOCATION, node);
long lastSequenceNumber;
if (formatVersion > 1) {
lastSequenceNumber = JsonUtil.getLong(LAST_SEQUENCE_NUMBER, node);
} else {
lastSequenceNumber = TableMetadata.INITIAL_SEQUENCE_NUMBER;
}
int lastAssignedColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
Schema schema = SchemaParser.fromJson(node.get(SCHEMA));
JsonNode specArray = node.get(PARTITION_SPECS);
List<PartitionSpec> specs;
int defaultSpecId;
if (specArray != null) {
Preconditions.checkArgument(specArray.isArray(),
"Cannot parse partition specs from non-array: %s", specArray);
// default spec ID is required when the spec array is present
defaultSpecId = JsonUtil.getInt(DEFAULT_SPEC_ID, node);
// parse the spec array
ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
for (JsonNode spec : specArray) {
builder.add(PartitionSpecParser.fromJson(schema, spec));
}
specs = builder.build();
} else {
Preconditions.checkArgument(formatVersion == 1,
"%s must exist in format v%s", PARTITION_SPECS, formatVersion);
// partition spec is required for older readers, but is always set to the default if the spec
// array is set. it is only used to default the spec map is missing, indicating that the
// table metadata was written by an older writer.
defaultSpecId = TableMetadata.INITIAL_SPEC_ID;
specs = ImmutableList.of(PartitionSpecParser.fromJsonFields(
schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
}
JsonNode sortOrderArray = node.get(SORT_ORDERS);
List<SortOrder> sortOrders;
int defaultSortOrderId;
if (sortOrderArray != null) {
defaultSortOrderId = JsonUtil.getInt(DEFAULT_SORT_ORDER_ID, node);
ImmutableList.Builder<SortOrder> sortOrdersBuilder = ImmutableList.builder();
for (JsonNode sortOrder : sortOrderArray) {
sortOrdersBuilder.add(SortOrderParser.fromJson(schema, sortOrder));
}
sortOrders = sortOrdersBuilder.build();
} else {
Preconditions.checkArgument(formatVersion == 1,
"%s must exist in format v%s", SORT_ORDERS, formatVersion);
SortOrder defaultSortOrder = SortOrder.unsorted();
sortOrders = ImmutableList.of(defaultSortOrder);
defaultSortOrderId = defaultSortOrder.orderId();
}
Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, node);
long currentVersionId = JsonUtil.getLong(CURRENT_SNAPSHOT_ID, node);
long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node);
JsonNode snapshotArray = node.get(SNAPSHOTS);
Preconditions.checkArgument(snapshotArray.isArray(),
"Cannot parse snapshots from non-array: %s", snapshotArray);
List<Snapshot> snapshots = Lists.newArrayListWithExpectedSize(snapshotArray.size());
Iterator<JsonNode> iterator = snapshotArray.elements();
while (iterator.hasNext()) {
snapshots.add(SnapshotParser.fromJson(io, iterator.next()));
}
ImmutableList.Builder<HistoryEntry> entries = ImmutableList.builder();
if (node.has(SNAPSHOT_LOG)) {
Iterator<JsonNode> logIterator = node.get(SNAPSHOT_LOG).elements();
while (logIterator.hasNext()) {
JsonNode entryNode = logIterator.next();
entries.add(new SnapshotLogEntry(
JsonUtil.getLong(TIMESTAMP_MS, entryNode), JsonUtil.getLong(SNAPSHOT_ID, entryNode)));
}
}
ImmutableList.Builder<MetadataLogEntry> metadataEntries = ImmutableList.builder();
if (node.has(METADATA_LOG)) {
Iterator<JsonNode> logIterator = node.get(METADATA_LOG).elements();
while (logIterator.hasNext()) {
JsonNode entryNode = logIterator.next();
metadataEntries.add(new MetadataLogEntry(
JsonUtil.getLong(TIMESTAMP_MS, entryNode), JsonUtil.getString(METADATA_FILE, entryNode)));
}
}
return new TableMetadata(file, formatVersion, uuid, location,
lastSequenceNumber, lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs,
defaultSortOrderId, sortOrders, properties, currentVersionId, snapshots, entries.build(),
metadataEntries.build());
}
}