blob: f22996de59e869b52b1951f6f6df1fbfcd92729d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CatalogUtil {
private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class);
public static final String ICEBERG_CATALOG_TYPE = "type";
public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog";
public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog";
private CatalogUtil() {
}
/**
* Drops all data and metadata files referenced by TableMetadata.
* <p>
* This should be called by dropTable implementations to clean up table files once the table has been dropped in the
* metastore.
*
* @param io a FileIO to use for deletes
* @param metadata the last valid TableMetadata instance for a dropped table.
*/
public static void dropTableData(FileIO io, TableMetadata metadata) {
// Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete
// as much of the delete work as possible and avoid orphaned data or manifest files.
Set<String> manifestListsToDelete = Sets.newHashSet();
Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
for (Snapshot snapshot : metadata.snapshots()) {
// add all manifests to the delete set because both data and delete files should be removed
Iterables.addAll(manifestsToDelete, snapshot.allManifests());
// add the manifest list to the delete set, if present
if (snapshot.manifestListLocation() != null) {
manifestListsToDelete.add(snapshot.manifestListLocation());
}
}
LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
// run all of the deletes
deleteFiles(io, manifestsToDelete);
Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
.noRetry().suppressFailureWhenFinished()
.onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc))
.run(io::deleteFile);
Tasks.foreach(manifestListsToDelete)
.noRetry().suppressFailureWhenFinished()
.onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc))
.run(io::deleteFile);
Tasks.foreach(metadata.metadataFileLocation())
.noRetry().suppressFailureWhenFinished()
.onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: {}", list, exc))
.run(io::deleteFile);
}
@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Map<String, Boolean> deletedFiles = new MapMaker()
.concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
.weakKeys()
.makeMap();
Tasks.foreach(allManifests)
.noRetry().suppressFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
for (ManifestEntry<?> entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true);
if (alreadyDeleted == null || !alreadyDeleted) {
try {
io.deleteFile(path);
} catch (RuntimeException e) {
// this may happen if the map of deleted files gets cleaned up by gc
LOG.warn("Delete failed for data file: {}", path, e);
}
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest.path());
}
});
}
/**
* Load a custom catalog implementation.
* <p>
* The catalog must have a no-arg constructor.
* If the class implements {@link Configurable},
* a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}.
* {@link Catalog#initialize(String catalogName, Map options)} is called to complete the initialization.
*
* @param impl catalog implementation full class name
* @param catalogName catalog name
* @param properties catalog properties
* @param hadoopConf hadoop configuration if needed
* @return initialized catalog object
* @throws IllegalArgumentException if no-arg constructor not found or error during initialization
*/
public static Catalog loadCatalog(
String impl,
String catalogName,
Map<String, String> properties,
Configuration hadoopConf) {
Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null");
DynConstructors.Ctor<Catalog> ctor;
try {
ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize Catalog, missing no-arg constructor: %s", impl), e);
}
Catalog catalog;
try {
catalog = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e);
}
if (catalog instanceof Configurable) {
((Configurable) catalog).setConf(hadoopConf);
}
catalog.initialize(catalogName, properties);
return catalog;
}
public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) {
String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
if (catalogImpl == null) {
String catalogType = options.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case ICEBERG_CATALOG_TYPE_HIVE:
catalogImpl = ICEBERG_CATALOG_HIVE;
break;
case ICEBERG_CATALOG_TYPE_HADOOP:
catalogImpl = ICEBERG_CATALOG_HADOOP;
break;
default:
throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
}
}
return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
}
/**
* Load a custom {@link FileIO} implementation.
* <p>
* The implementation must have a no-arg constructor.
* If the class implements {@link Configurable},
* a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}.
* {@link FileIO#initialize(Map properties)} is called to complete the initialization.
*
* @param impl full class name of a custom FileIO implementation
* @param hadoopConf hadoop configuration
* @return FileIO class
* @throws IllegalArgumentException if class path not found or
* right constructor not found or
* the loaded class cannot be casted to the given interface type
*/
public static FileIO loadFileIO(
String impl,
Map<String, String> properties,
Configuration hadoopConf) {
LOG.info("Loading custom FileIO implementation: {}", impl);
DynConstructors.Ctor<FileIO> ctor;
try {
ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize FileIO, missing no-arg constructor: %s", impl), e);
}
FileIO fileIO;
try {
fileIO = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize FileIO, %s does not implement FileIO.", impl), e);
}
if (fileIO instanceof Configurable) {
((Configurable) fileIO).setConf(hadoopConf);
}
fileIO.initialize(properties);
return fileIO;
}
}