blob: 68797927e26964505a28a412e18c991afe20b3af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hadoop;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HadoopCatalog provides a way to use table names like db.table to work with path-based tables under a common
* location. It uses a specified directory under a specified filesystem as the warehouse directory, and organizes
* multiple levels directories that mapped to the database, namespace and the table respectively. The HadoopCatalog
* takes a location as the warehouse directory. When creating a table such as $db.$tbl, it creates $db/$tbl
* directory under the warehouse directory, and put the table metadata into that directory.
*
* The HadoopCatalog now supports {@link org.apache.iceberg.catalog.Catalog#createTable},
* {@link org.apache.iceberg.catalog.Catalog#dropTable}, the {@link org.apache.iceberg.catalog.Catalog#renameTable}
* is not supported yet.
*
* Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
*/
public class HadoopCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalog.class);
private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = "iceberg/warehouse";
private static final String TABLE_METADATA_FILE_EXTENSION = ".metadata.json";
private static final Joiner SLASH = Joiner.on("/");
private static final PathFilter TABLE_FILTER = path -> path.getName().endsWith(TABLE_METADATA_FILE_EXTENSION);
private static final String HADOOP_SUPPRESS_PERMISSION_ERROR = "suppress-permission-error";
private String catalogName;
private Configuration conf;
private String warehouseLocation;
private FileSystem fs;
private FileIO fileIO;
private boolean suppressPermissionError = false;
public HadoopCatalog(){
}
/**
* The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
*
* @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
* v0.12.0
* @param name The catalog name
* @param conf The Hadoop configuration
* @param warehouseLocation The location used as warehouse directory
*/
@Deprecated
public HadoopCatalog(String name, Configuration conf, String warehouseLocation) {
this(name, conf, warehouseLocation, Maps.newHashMap());
}
/**
* The all-arg constructor of the HadoopCatalog.
*
* @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in
* v0.12.0
* @param name The catalog name
* @param conf The Hadoop configuration
* @param warehouseLocation The location used as warehouse directory
* @param properties catalog properties
*/
@Deprecated
public HadoopCatalog(String name, Configuration conf, String warehouseLocation, Map<String, String> properties) {
Preconditions.checkArgument(warehouseLocation != null && !warehouseLocation.equals(""),
"Cannot instantiate hadoop catalog. No location provided for warehouse");
setConf(conf);
Map<String, String> props = Maps.newHashMap(properties);
props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
initialize(name, props);
}
@Override
public void initialize(String name, Map<String, String> properties) {
String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
Preconditions.checkArgument(inputWarehouseLocation != null && !inputWarehouseLocation.equals(""),
"Cannot instantiate hadoop catalog. No location provided for warehouse (Set warehouse config)");
this.catalogName = name;
this.warehouseLocation = inputWarehouseLocation.replaceAll("/*$", "");
this.fs = Util.getFs(new Path(warehouseLocation), conf);
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
this.suppressPermissionError = Boolean.parseBoolean(properties.get(HADOOP_SUPPRESS_PERMISSION_ERROR));
}
/**
* The constructor of the HadoopCatalog. It uses the passed location as its warehouse directory.
*
* @param conf The Hadoop configuration
* @param warehouseLocation The location used as warehouse directory
*/
public HadoopCatalog(Configuration conf, String warehouseLocation) {
this("hadoop", conf, warehouseLocation);
}
/**
* The constructor of the HadoopCatalog. It gets the value of <code>fs.defaultFS</code> property
* from the passed Hadoop configuration as its default file system, and use the default directory
* <code>iceberg/warehouse</code> as the warehouse directory.
*
* @param conf The Hadoop configuration
*/
public HadoopCatalog(Configuration conf) {
this("hadoop", conf, conf.get("fs.defaultFS") + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE);
}
@Override
public String name() {
return catalogName;
}
private boolean shouldSuppressPermissionError(IOException ioException) {
if (suppressPermissionError) {
return ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch");
}
return false;
}
private boolean isTableDir(Path path) {
Path metadataPath = new Path(path, "metadata");
// Only the path which contains metadata is the path for table, otherwise it could be
// still a namespace.
try {
return fs.listStatus(metadataPath, TABLE_FILTER).length >= 1;
} catch (FileNotFoundException e) {
return false;
} catch (IOException e) {
if (shouldSuppressPermissionError(e)) {
LOG.warn("Unable to list metadata directory {}: {}", metadataPath, e);
return false;
} else {
throw new UncheckedIOException(e);
}
}
}
private boolean isDirectory(Path path) {
try {
return fs.getFileStatus(path).isDirectory();
} catch (FileNotFoundException e) {
return false;
} catch (IOException e) {
if (shouldSuppressPermissionError(e)) {
LOG.warn("Unable to list directory {}: {}", path, e);
return false;
} else {
throw new UncheckedIOException(e);
}
}
}
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(namespace.levels().length >= 1,
"Missing database in table identifier: %s", namespace);
Path nsPath = new Path(warehouseLocation, SLASH.join(namespace.levels()));
Set<TableIdentifier> tblIdents = Sets.newHashSet();
try {
if (!isDirectory(nsPath)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
for (FileStatus s : fs.listStatus(nsPath)) {
if (!s.isDirectory()) {
// Ignore the path which is not a directory.
continue;
}
Path path = s.getPath();
if (isTableDir(path)) {
TableIdentifier tblIdent = TableIdentifier.of(namespace, path.getName());
tblIdents.add(tblIdent);
}
}
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to list tables under: %s", namespace);
}
return Lists.newArrayList(tblIdents);
}
@Override
protected boolean isValidIdentifier(TableIdentifier identifier) {
return true;
}
@Override
protected TableOperations newTableOps(TableIdentifier identifier) {
return new HadoopTableOperations(new Path(defaultWarehouseLocation(identifier)), fileIO, conf);
}
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
String tableName = tableIdentifier.name();
StringBuilder sb = new StringBuilder();
sb.append(warehouseLocation).append('/');
for (String level : tableIdentifier.namespace().levels()) {
sb.append(level).append('/');
}
sb.append(tableName);
return sb.toString();
}
@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
if (!isValidIdentifier(identifier)) {
throw new NoSuchTableException("Invalid identifier: %s", identifier);
}
Path tablePath = new Path(defaultWarehouseLocation(identifier));
TableOperations ops = newTableOps(identifier);
TableMetadata lastMetadata;
if (purge && ops.current() != null) {
lastMetadata = ops.current();
} else {
lastMetadata = null;
}
try {
if (purge && lastMetadata != null) {
// Since the data files and the metadata files may store in different locations,
// so it has to call dropTableData to force delete the data file.
CatalogUtil.dropTableData(ops.io(), lastMetadata);
}
fs.delete(tablePath, true /* recursive */);
return true;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", tablePath);
}
}
@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
throw new UnsupportedOperationException("Cannot rename Hadoop tables");
}
@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
!namespace.isEmpty(),
"Cannot create namespace with invalid name: %s", namespace);
if (!meta.isEmpty()) {
throw new UnsupportedOperationException("Cannot create namespace " + namespace + ": metadata is not supported");
}
Path nsPath = new Path(warehouseLocation, SLASH.join(namespace.levels()));
if (isNamespace(nsPath)) {
throw new AlreadyExistsException("Namespace already exists: %s", namespace);
}
try {
fs.mkdirs(nsPath);
} catch (IOException e) {
throw new RuntimeIOException(e, "Create namespace failed: %s", namespace);
}
}
@Override
public List<Namespace> listNamespaces(Namespace namespace) {
Path nsPath = namespace.isEmpty() ? new Path(warehouseLocation)
: new Path(warehouseLocation, SLASH.join(namespace.levels()));
if (!isNamespace(nsPath)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
try {
return Stream.of(fs.listStatus(nsPath))
.map(FileStatus::getPath)
.filter(this::isNamespace)
.map(path -> append(namespace, path.getName()))
.collect(Collectors.toList());
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to list namespace under: %s", namespace);
}
}
private Namespace append(Namespace ns, String name) {
String[] levels = Arrays.copyOfRange(ns.levels(), 0, ns.levels().length + 1);
levels[ns.levels().length] = name;
return Namespace.of(levels);
}
@Override
public boolean dropNamespace(Namespace namespace) {
Path nsPath = new Path(warehouseLocation, SLASH.join(namespace.levels()));
if (!isNamespace(nsPath) || namespace.isEmpty()) {
return false;
}
try {
if (fs.listStatusIterator(nsPath).hasNext()) {
throw new NamespaceNotEmptyException("Namespace %s is not empty.", namespace);
}
return fs.delete(nsPath, false /* recursive */);
} catch (IOException e) {
throw new RuntimeIOException(e, "Namespace delete failed: %s", namespace);
}
}
@Override
public boolean setProperties(Namespace namespace, Map<String, String> properties) {
throw new UnsupportedOperationException(
"Cannot set namespace properties " + namespace + " : setProperties is not supported");
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties) {
throw new UnsupportedOperationException(
"Cannot remove properties " + namespace + " : removeProperties is not supported");
}
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
Path nsPath = new Path(warehouseLocation, SLASH.join(namespace.levels()));
if (!isNamespace(nsPath) || namespace.isEmpty()) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
return ImmutableMap.of("location", nsPath.toString());
}
private boolean isNamespace(Path path) {
return isDirectory(path) && !isTableDir(path);
}
@Override
public void close() throws IOException {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", catalogName)
.add("location", warehouseLocation)
.toString();
}
@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new HadoopCatalogTableBuilder(identifier, schema);
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
private class HadoopCatalogTableBuilder extends BaseMetastoreCatalogTableBuilder {
private final String defaultLocation;
private HadoopCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
super(identifier, schema);
defaultLocation = defaultWarehouseLocation(identifier);
}
@Override
public TableBuilder withLocation(String location) {
Preconditions.checkArgument(location == null || location.equals(defaultLocation),
"Cannot set a custom location for a path-based table. Expected " + defaultLocation + " but got " + location);
return this;
}
}
}