blob: a1d04789381b45516aae2072e6c144785332d9cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hadoop;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Tables;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of Iceberg tables that uses the Hadoop FileSystem
* to store metadata and manifests.
*/
public class HadoopTables implements Tables, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(HadoopTables.class);
private static final String METADATA_JSON = "metadata.json";
private Configuration conf;
public HadoopTables() {
this(new Configuration());
}
public HadoopTables(Configuration conf) {
this.conf = conf;
}
/**
* Loads the table location from a FileSystem path location.
*
* @param location a path URI (e.g. hdfs:///warehouse/my_table/)
* @return table implementation
*/
@Override
public Table load(String location) {
Table result;
Pair<String, MetadataTableType> parsedMetadataType = parseMetadataType(location);
if (parsedMetadataType != null) {
// Load a metadata table
result = loadMetadataTable(parsedMetadataType.first(), parsedMetadataType.second());
} else {
// Load a normal table
TableOperations ops = newTableOps(location);
if (ops.current() != null) {
result = new BaseTable(ops, location);
} else {
throw new NoSuchTableException("Table does not exist at location: " + location);
}
}
LOG.info("Table location loaded: {}", result.location());
return result;
}
/**
* Try to resolve a metadata table, which we encode as URI fragments
* e.g. hdfs:///warehouse/my_table#snapshots
* @param location Path to parse
* @return A base table name and MetadataTableType if a type is found, null if not
*/
private Pair<String, MetadataTableType> parseMetadataType(String location) {
int hashIndex = location.lastIndexOf('#');
if (hashIndex != -1 & !location.endsWith("#")) {
String baseTable = location.substring(0, hashIndex);
String metaTable = location.substring(hashIndex + 1);
MetadataTableType type = MetadataTableType.from(metaTable);
return (type == null) ? null : Pair.of(baseTable, type);
} else {
return null;
}
}
private Table loadMetadataTable(String location, MetadataTableType type) {
TableOperations ops = newTableOps(location);
if (ops.current() == null) {
throw new NoSuchTableException("Table does not exist at location: " + location);
}
return MetadataTableUtils.createMetadataTableInstance(ops, location, type);
}
/**
* Create a table using the FileSystem implementation resolve from
* location.
*
* @param schema iceberg schema used to create the table
* @param spec partitioning spec, if null the table will be unpartitioned
* @param properties a string map of table properties, initialized to empty if null
* @param location a path URI (e.g. hdfs:///warehouse/my_table)
* @return newly created table implementation
*/
@Override
public Table create(Schema schema, PartitionSpec spec, SortOrder order,
Map<String, String> properties, String location) {
Preconditions.checkNotNull(schema, "A table schema is required");
TableOperations ops = newTableOps(location);
if (ops.current() != null) {
throw new AlreadyExistsException("Table already exists at location: " + location);
}
Map<String, String> tableProps = properties == null ? ImmutableMap.of() : properties;
PartitionSpec partitionSpec = spec == null ? PartitionSpec.unpartitioned() : spec;
SortOrder sortOrder = order == null ? SortOrder.unsorted() : order;
TableMetadata metadata = TableMetadata.newTableMetadata(schema, partitionSpec, sortOrder, location, tableProps);
ops.commit(null, metadata);
return new BaseTable(ops, location);
}
@VisibleForTesting
TableOperations newTableOps(String location) {
if (location.contains(METADATA_JSON)) {
return new StaticTableOperations(location, new HadoopFileIO(conf));
} else {
return new HadoopTableOperations(new Path(location), conf);
}
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}