blob: 2fccef5a0ab32ae96104874e7872c7b56e5bfd0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
public static final String TABLE_TYPE_PROP = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
private static final String METADATA_FOLDER_NAME = "metadata";
private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
private boolean shouldRefresh = true;
private int version = -1;
protected BaseMetastoreTableOperations() {}
/**
* The full name of the table used for logging purposes only. For example for HiveTableOperations
* it is catalogName + "." + database + "." + table.
*
* @return The full name
*/
protected abstract String tableName();
@Override
public TableMetadata current() {
if (shouldRefresh) {
return refresh();
}
return currentMetadata;
}
public String currentMetadataLocation() {
return currentMetadataLocation;
}
public int currentVersion() {
return version;
}
@Override
public TableMetadata refresh() {
boolean currentMetadataWasAvailable = currentMetadata != null;
try {
doRefresh();
} catch (NoSuchTableException e) {
if (currentMetadataWasAvailable) {
LOG.warn("Could not find the table during refresh, setting current metadata to null", e);
shouldRefresh = true;
}
currentMetadata = null;
currentMetadataLocation = null;
version = -1;
throw e;
}
return current();
}
protected void doRefresh() {
throw new UnsupportedOperationException("Not implemented: doRefresh");
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
// if the metadata is already out of date, reject it
if (base != current()) {
if (base != null) {
throw new CommitFailedException("Cannot commit: stale table metadata");
} else {
// when current is non-null, the table exists. but when base is null, the commit is trying
// to create the table
throw new AlreadyExistsException("Table already exists: %s", tableName());
}
}
// if the metadata is not changed, return early
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
long start = System.currentTimeMillis();
doCommit(base, metadata);
deleteRemovedMetadataFiles(base, metadata);
requestRefresh();
LOG.info(
"Successfully committed to table {} in {} ms",
tableName(),
System.currentTimeMillis() - start);
}
protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Not implemented: doCommit");
}
protected void requestRefresh() {
this.shouldRefresh = true;
}
protected void disableRefresh() {
this.shouldRefresh = false;
}
protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) {
return newTable && metadata.metadataFileLocation() != null
? metadata.metadataFileLocation()
: writeNewMetadata(metadata, currentVersion() + 1);
}
protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
// write the new metadata
// use overwrite to avoid negative caching in S3. this is safe because the metadata location is
// always unique because it includes a UUID.
TableMetadataParser.overwrite(metadata, newMetadataLocation);
return newMetadataLocation.location();
}
protected void refreshFromMetadataLocation(String newLocation) {
refreshFromMetadataLocation(newLocation, null, 20);
}
protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
refreshFromMetadataLocation(newLocation, null, numRetries);
}
protected void refreshFromMetadataLocation(
String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
refreshFromMetadataLocation(
newLocation,
shouldRetry,
numRetries,
metadataLocation -> TableMetadataParser.read(io(), metadataLocation));
}
protected void refreshFromMetadataLocation(
String newLocation,
Predicate<Exception> shouldRetry,
int numRetries,
Function<String, TableMetadata> metadataLoader) {
// use null-safe equality check because new tables have a null metadata location
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);
AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(numRetries)
.exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
.throwFailureWhenFinished()
.stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));
String newUUID = newMetadata.get().uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Preconditions.checkState(
newUUID.equals(currentMetadata.uuid()),
"Table UUID does not match: current=%s != refreshed=%s",
currentMetadata.uuid(),
newUUID);
}
this.currentMetadata = newMetadata.get();
this.currentMetadataLocation = newLocation;
this.version = parseVersion(newLocation);
}
this.shouldRefresh = false;
}
private String metadataFileLocation(TableMetadata metadata, String filename) {
String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
if (metadataLocation != null) {
return String.format("%s/%s", LocationUtil.stripTrailingSlash(metadataLocation), filename);
} else {
return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
}
}
@Override
public String metadataFileLocation(String filename) {
return metadataFileLocation(current(), filename);
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(current().location(), current().properties());
}
@Override
public TableOperations temp(TableMetadata uncommittedMetadata) {
return new TableOperations() {
@Override
public TableMetadata current() {
return uncommittedMetadata;
}
@Override
public TableMetadata refresh() {
throw new UnsupportedOperationException(
"Cannot call refresh on temporary table operations");
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
}
@Override
public String metadataFileLocation(String fileName) {
return BaseMetastoreTableOperations.this.metadataFileLocation(
uncommittedMetadata, fileName);
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(
uncommittedMetadata.location(), uncommittedMetadata.properties());
}
@Override
public FileIO io() {
return BaseMetastoreTableOperations.this.io();
}
@Override
public EncryptionManager encryption() {
return BaseMetastoreTableOperations.this.encryption();
}
@Override
public long newSnapshotId() {
return BaseMetastoreTableOperations.this.newSnapshotId();
}
};
}
protected enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}
/**
* Attempt to load the table and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
* of our commit.
*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);
AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);
Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});
if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
}
return status.get();
}
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
String codecName =
meta.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
String fileExtension = TableMetadataParser.getFileExtension(codecName);
return metadataFileLocation(
meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}
/**
* Parse the version from table metadata file name.
*
* @param metadataLocation table metadata file location
* @return version of the table metadata file in success case and -1 if the version is not
* parsable (as a sign that the metadata is not part of this catalog)
*/
private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
if (versionEnd < 0) {
// found filesystem table's metadata
return -1;
}
try {
return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e);
return -1;
}
}
/**
* Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}
boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
if (deleteAfterCommit) {
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
// TableMetadata#addPreviousFile builds up the metadata log and uses
// TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in
// the log, thus we don't include metadata.previousFiles() for deletion - everything else can
// be removed
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(previousMetadataFile, exc) ->
LOG.warn(
"Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
}