blob: 2c33dbeb643de7416bbb659474e4af41cb2ba441 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hadoop;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TableOperations implementation for file systems that support atomic rename.
* <p>
* This maintains metadata in a "metadata" folder under the table location.
*/
public class HadoopTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HadoopTableOperations.class);
private static final Pattern VERSION_PATTERN = Pattern.compile("v([^\\.]*)\\..*");
private final Configuration conf;
private final Path location;
private HadoopFileIO defaultFileIo = null;
private volatile TableMetadata currentMetadata = null;
private volatile Integer version = null;
private volatile boolean shouldRefresh = true;
protected HadoopTableOperations(Path location, Configuration conf) {
this.conf = conf;
this.location = location;
}
@Override
public TableMetadata current() {
if (shouldRefresh) {
return refresh();
}
return currentMetadata;
}
private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
return Pair.of(version, currentMetadata);
}
private synchronized void updateVersionAndMetadata(int newVersion, String metadataFile) {
// update if the current version is out of date
if (version == null || version != newVersion) {
this.version = newVersion;
this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile));
}
}
@Override
public TableMetadata refresh() {
int ver = version != null ? version : findVersion();
try {
Path metadataFile = getMetadataFile(ver);
if (version == null && metadataFile == null && ver == 0) {
// no v0 metadata means the table doesn't exist yet
return null;
} else if (metadataFile == null) {
throw new ValidationException("Metadata file for version %d is missing", ver);
}
Path nextMetadataFile = getMetadataFile(ver + 1);
while (nextMetadataFile != null) {
ver += 1;
metadataFile = nextMetadataFile;
nextMetadataFile = getMetadataFile(ver + 1);
}
updateVersionAndMetadata(ver, metadataFile.toString());
this.shouldRefresh = false;
return currentMetadata;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to refresh the table");
}
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
Pair<Integer, TableMetadata> current = versionAndMetadata();
if (base != current.second()) {
throw new CommitFailedException("Cannot commit changes based on stale table metadata");
}
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated");
Preconditions.checkArgument(
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");
String codecName = metadata.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
String fileExtension = TableMetadataParser.getFileExtension(codec);
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
int nextVersion = (current.first() != null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
try {
if (fs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile);
}
} catch (IOException e) {
throw new RuntimeIOException(e,
"Failed to check if next version exists: " + finalMetadataFile);
}
// this rename operation is the atomic commit operation
renameToFinal(fs, tempMetadataFile, finalMetadataFile);
// update the best-effort version pointer
writeVersionHint(nextVersion);
deleteRemovedMetadataFiles(base, metadata);
this.shouldRefresh = true;
}
@Override
public FileIO io() {
if (defaultFileIo == null) {
defaultFileIo = new HadoopFileIO(conf);
}
return defaultFileIo;
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(current().location(), current().properties());
}
@Override
public String metadataFileLocation(String fileName) {
return metadataPath(fileName).toString();
}
@Override
public TableOperations temp(TableMetadata uncommittedMetadata) {
return new TableOperations() {
@Override
public TableMetadata current() {
return uncommittedMetadata;
}
@Override
public TableMetadata refresh() {
throw new UnsupportedOperationException("Cannot call refresh on temporary table operations");
}
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
}
@Override
public String metadataFileLocation(String fileName) {
return HadoopTableOperations.this.metadataFileLocation(fileName);
}
@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(uncommittedMetadata.location(), uncommittedMetadata.properties());
}
@Override
public FileIO io() {
return HadoopTableOperations.this.io();
}
@Override
public EncryptionManager encryption() {
return HadoopTableOperations.this.encryption();
}
@Override
public long newSnapshotId() {
return HadoopTableOperations.this.newSnapshotId();
}
};
}
@VisibleForTesting
Path getMetadataFile(int metadataVersion) throws IOException {
for (TableMetadataParser.Codec codec : TableMetadataParser.Codec.values()) {
Path metadataFile = metadataFilePath(metadataVersion, codec);
FileSystem fs = getFileSystem(metadataFile, conf);
if (fs.exists(metadataFile)) {
return metadataFile;
}
if (codec.equals(TableMetadataParser.Codec.GZIP)) {
// we have to be backward-compatible with .metadata.json.gz files
metadataFile = oldMetadataFilePath(metadataVersion, codec);
fs = getFileSystem(metadataFile, conf);
if (fs.exists(metadataFile)) {
return metadataFile;
}
}
}
return null;
}
private Path metadataFilePath(int metadataVersion, TableMetadataParser.Codec codec) {
return metadataPath("v" + metadataVersion + TableMetadataParser.getFileExtension(codec));
}
private Path oldMetadataFilePath(int metadataVersion, TableMetadataParser.Codec codec) {
return metadataPath("v" + metadataVersion + TableMetadataParser.getOldFileExtension(codec));
}
private Path metadataPath(String filename) {
return new Path(metadataRoot(), filename);
}
private Path metadataRoot() {
return new Path(location, "metadata");
}
private int version(String fileName) {
Matcher matcher = VERSION_PATTERN.matcher(fileName);
if (!matcher.matches()) {
return -1;
}
String versionNumber = matcher.group(1);
try {
return Integer.parseInt(versionNumber);
} catch (NumberFormatException ne) {
return -1;
}
}
@VisibleForTesting
Path versionHintFile() {
return metadataPath("version-hint.text");
}
private void writeVersionHint(int versionToWrite) {
Path versionHintFile = versionHintFile();
FileSystem fs = getFileSystem(versionHintFile, conf);
try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */)) {
out.write(String.valueOf(versionToWrite).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
LOG.warn("Failed to update version hint", e);
}
}
@VisibleForTesting
int findVersion() {
Path versionHintFile = versionHintFile();
FileSystem fs = Util.getFs(versionHintFile, conf);
try (InputStreamReader fsr = new InputStreamReader(fs.open(versionHintFile), StandardCharsets.UTF_8);
BufferedReader in = new BufferedReader(fsr)) {
return Integer.parseInt(in.readLine().replace("\n", ""));
} catch (Exception e) {
try {
if (fs.exists(metadataRoot())) {
LOG.warn("Error reading version hint file {}", versionHintFile, e);
} else {
LOG.debug("Metadata for table not found in directory {}", metadataRoot(), e);
return 0;
}
// List the metadata directory to find the version files, and try to recover the max available version
FileStatus[] files = fs.listStatus(metadataRoot(), name -> VERSION_PATTERN.matcher(name.getName()).matches());
int maxVersion = 0;
for (FileStatus file : files) {
int currentVersion = version(file.getPath().getName());
if (currentVersion > maxVersion && getMetadataFile(currentVersion) != null) {
maxVersion = currentVersion;
}
}
return maxVersion;
} catch (IOException io) {
LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
return 0;
}
}
}
/**
* Renames the source file to destination, using the provided file system. If the rename failed,
* an attempt will be made to delete the source file.
*
* @param fs the filesystem used for the rename
* @param src the source file
* @param dst the destination file
*/
private void renameToFinal(FileSystem fs, Path src, Path dst) {
try {
if (!fs.rename(src, dst)) {
CommitFailedException cfe = new CommitFailedException(
"Failed to commit changes using rename: %s", dst);
RuntimeException re = tryDelete(src);
if (re != null) {
cfe.addSuppressed(re);
}
throw cfe;
}
} catch (IOException e) {
CommitFailedException cfe = new CommitFailedException(e,
"Failed to commit changes using rename: %s", dst);
RuntimeException re = tryDelete(src);
if (re != null) {
cfe.addSuppressed(re);
}
throw cfe;
}
}
/**
* Deletes the file from the file system. Any RuntimeException will be caught and returned.
*
* @param path the file to be deleted.
* @return RuntimeException caught, if any. null otherwise.
*/
private RuntimeException tryDelete(Path path) {
try {
io().deleteFile(path.toString());
return null;
} catch (RuntimeException re) {
return re;
}
}
protected FileSystem getFileSystem(Path path, Configuration hadoopConf) {
return Util.getFs(path, hadoopConf);
}
/**
* Deletes the oldest metadata files if {@link TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}
boolean deleteAfterCommit = metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles());
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
if (deleteAfterCommit) {
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry().suppressFailureWhenFinished()
.onFailure((previousMetadataFile, exc) ->
LOG.warn("Delete failed for previous metadata file: {}", previousMetadataFile, exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
private static TableMetadata checkUUID(TableMetadata currentMetadata, TableMetadata newMetadata) {
String newUUID = newMetadata.uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Preconditions.checkState(newUUID.equals(currentMetadata.uuid()),
"Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
}
return newMetadata;
}
}