blob: 157d4371955b10dc3ae21058190cbfb3acb9e8ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
public static final String TABLE_TYPE_PROP = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
private static final String METADATA_FOLDER_NAME = "metadata";
private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
private boolean shouldRefresh = true;
private int version = -1;
protected BaseMetastoreTableOperations() {
}
/**
* The full name of the table used for logging purposes only. For example for HiveTableOperations it is
* catalogName + "." + database + "." + table.
* @return The full name
*/
protected String tableName() {
return null;
}
@Override
public TableMetadata current() {
if (shouldRefresh) {
return refresh();
}
return currentMetadata;
}
public String currentMetadataLocation() {
return currentMetadataLocation;
}
public int currentVersion() {
return version;
}
@Override
public TableMetadata refresh() {
boolean currentMetadataWasAvailable = currentMetadata != null;
try {
doRefresh();
} catch (NoSuchTableException e) {
if (currentMetadataWasAvailable) {
LOG.warn("Could not find the table during refresh, setting current metadata to null", e);
shouldRefresh = true;
}
currentMetadata = null;
currentMetadataLocation = null;
version = -1;
throw e;
}
return current();
}
protected void doRefresh() {
throw new UnsupportedOperationException("Not implemented: doRefresh");
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
// if the metadata is already out of date, reject it
if (base != current()) {
throw new CommitFailedException("Cannot commit: stale table metadata");
}
// if the metadata is not changed, return early
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
long start = System.currentTimeMillis();
doCommit(base, metadata);
deleteRemovedMetadataFiles(base, metadata);
requestRefresh();
LOG.info("Successfully committed to table {} in {} ms",
tableName(),
System.currentTimeMillis() - start);
}
protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Not implemented: doCommit");
}
protected void requestRefresh() {
this.shouldRefresh = true;
}
protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
// write the new metadata
// use overwrite to avoid negative caching in S3. this is safe because the metadata location is
// always unique because it includes a UUID.
TableMetadataParser.overwrite(metadata, newMetadataLocation);
return newMetadataLocation.location();
}
protected void refreshFromMetadataLocation(String newLocation) {
refreshFromMetadataLocation(newLocation, null, 20);
}
protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
refreshFromMetadataLocation(newLocation, null, numRetries);
}
protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
int numRetries) {
// use null-safe equality check because new tables have a null metadata location
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);
AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.throwFailureWhenFinished()
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(io(), metadataLocation)));
String newUUID = newMetadata.get().uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Preconditions.checkState(newUUID.equals(currentMetadata.uuid()),
"Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
}
this.currentMetadata = newMetadata.get();
this.currentMetadataLocation = newLocation;
this.version = parseVersion(newLocation);
}
this.shouldRefresh = false;
}
private String metadataFileLocation(TableMetadata metadata, String filename) {
String metadataLocation = metadata.properties()
.get(TableProperties.WRITE_METADATA_LOCATION);
if (metadataLocation != null) {
return String.format("%s/%s", metadataLocation, filename);
} else {
return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
}
}
@Override
public String metadataFileLocation(String filename) {
return metadataFileLocation(current(), filename);
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(current().location(), current().properties());
}
@Override
public TableOperations temp(TableMetadata uncommittedMetadata) {
return new TableOperations() {
@Override
public TableMetadata current() {
return uncommittedMetadata;
}
@Override
public TableMetadata refresh() {
throw new UnsupportedOperationException("Cannot call refresh on temporary table operations");
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
}
@Override
public String metadataFileLocation(String fileName) {
return BaseMetastoreTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName);
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(uncommittedMetadata.location(), uncommittedMetadata.properties());
}
@Override
public FileIO io() {
return BaseMetastoreTableOperations.this.io();
}
@Override
public EncryptionManager encryption() {
return BaseMetastoreTableOperations.this.encryption();
}
@Override
public long newSnapshotId() {
return BaseMetastoreTableOperations.this.newSnapshotId();
}
};
}
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
String codecName = meta.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
String fileExtension = TableMetadataParser.getFileExtension(codecName);
return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}
private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
try {
return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e);
return -1;
}
}
/**
* Deletes the oldest metadata files if {@link TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}
boolean deleteAfterCommit = metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles());
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
if (deleteAfterCommit) {
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry().suppressFailureWhenFinished()
.onFailure((previousMetadataFile, exc) ->
LOG.warn("Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
}